Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
52b71763 72 * The task execution response promise map.
be0676b3 73 *
2740a743 74 * - `key`: The message id of each submitted task.
a3445496 75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 76 *
a3445496 77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 78 */
501aea93
JB
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 81
a35560ba 82 /**
51fe3d3c 83 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
86 Worker,
87 Data,
88 Response
a35560ba
S
89 >
90
8735b4e5
JB
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
075e51d1 96 /**
adc9cc64 97 * Whether the pool is starting or not.
075e51d1
JB
98 */
99 private readonly starting: boolean
15b176e0
JB
100 /**
101 * Whether the pool is started or not.
102 */
103 private started: boolean
afe0d5bf
JB
104 /**
105 * The start timestamp of the pool.
106 */
107 private readonly startTimestamp
108
729c563d
S
109 /**
110 * Constructs a new poolifier pool.
111 *
38e795c1 112 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 113 * @param filePath - Path to the worker file.
38e795c1 114 * @param opts - Options for the pool.
729c563d 115 */
c97c7edb 116 public constructor (
b4213b7f
JB
117 protected readonly numberOfWorkers: number,
118 protected readonly filePath: string,
119 protected readonly opts: PoolOptions<Worker>
c97c7edb 120 ) {
78cea37e 121 if (!this.isMain()) {
04f45163 122 throw new Error(
8c6d4acf 123 'Cannot start a pool from a worker with the same type as the pool'
04f45163 124 )
c97c7edb 125 }
8d3782fa 126 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 127 this.checkFilePath(this.filePath)
7c0ba920 128 this.checkPoolOptions(this.opts)
1086026a 129
7254e419
JB
130 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
131 this.executeTask = this.executeTask.bind(this)
132 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 133
6bd72cd0 134 if (this.opts.enableEvents === true) {
7c0ba920
JB
135 this.emitter = new PoolEmitter()
136 }
d59df138
JB
137 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
138 Worker,
139 Data,
140 Response
da309861
JB
141 >(
142 this,
143 this.opts.workerChoiceStrategy,
144 this.opts.workerChoiceStrategyOptions
145 )
b6b32453
JB
146
147 this.setupHook()
148
075e51d1 149 this.starting = true
e761c033 150 this.startPool()
075e51d1 151 this.starting = false
15b176e0 152 this.started = true
afe0d5bf
JB
153
154 this.startTimestamp = performance.now()
c97c7edb
S
155 }
156
a35560ba 157 private checkFilePath (filePath: string): void {
ffcbbad8
JB
158 if (
159 filePath == null ||
3d6dd312 160 typeof filePath !== 'string' ||
ffcbbad8
JB
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
c510fea7
APA
163 throw new Error('Please specify a file with a worker implementation')
164 }
3d6dd312
JB
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
c510fea7
APA
168 }
169
8d3782fa
JB
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
78cea37e 175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 176 throw new TypeError(
0d80593b 177 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
178 )
179 } else if (numberOfWorkers < 0) {
473c717a 180 throw new RangeError(
8d3782fa
JB
181 'Cannot instantiate a pool with a negative number of workers'
182 )
6b27d407 183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 189 if (this.type === PoolTypes.dynamic) {
a5ed75b7 190 if (max == null) {
e695d66f 191 throw new TypeError(
a5ed75b7
JB
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
079de991
JB
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
b97d82d8 202 } else if (max === 0) {
079de991 203 throw new RangeError(
d640b48b 204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
8d3782fa
JB
211 }
212 }
213
7c0ba920 214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
215 if (isPlainObject(opts)) {
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
49be33fe
JB
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
1f68cede 226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 239 }
aee46736
JB
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 246 throw new Error(
aee46736 247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
248 )
249 }
7c0ba920
JB
250 }
251
0d80593b
JB
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
8990357d 260 if (
8c0b113f
JB
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
263 ) {
264 throw new TypeError(
8c0b113f 265 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
266 )
267 }
268 if (
8c0b113f
JB
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
8990357d
JB
271 ) {
272 throw new RangeError(
8c0b113f 273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
274 )
275 }
49be33fe
JB
276 if (
277 workerChoiceStrategyOptions.weights != null &&
6b27d407 278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
f0d7f803
JB
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
0d80593b
JB
294 }
295
a20f0ba5 296 private checkValidTasksQueueOptions (
76d91ea0 297 tasksQueueOptions: TasksQueueOptions
a20f0ba5 298 ): void {
0d80593b
JB
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
f0d7f803 302 if (
b7d085c4
JB
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
305 ) {
306 throw new TypeError(
20c6f652 307 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
308 )
309 }
310 if (
b7d085c4
JB
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
f0d7f803 313 ) {
e695d66f 314 throw new RangeError(
b7d085c4 315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
316 )
317 }
b7d085c4 318 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 319 throw new Error(
68dbcdc0 320 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
321 )
322 }
323 if (
b7d085c4
JB
324 tasksQueueOptions?.size != null &&
325 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 326 ) {
ff3f866a 327 throw new TypeError(
68dbcdc0 328 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
329 )
330 }
b7d085c4 331 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 332 throw new RangeError(
b7d085c4 333 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
334 )
335 }
336 }
337
e761c033
JB
338 private startPool (): void {
339 while (
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
343 0
344 ) < this.numberOfWorkers
345 ) {
aa9eede8 346 this.createAndSetupWorkerNode()
e761c033
JB
347 }
348 }
349
08f3f44c 350 /** @inheritDoc */
6b27d407
JB
351 public get info (): PoolInfo {
352 return {
23ccf9d7 353 version,
6b27d407 354 type: this.type,
184855e6 355 worker: this.worker,
2431bdb4
JB
356 ready: this.ready,
357 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
358 minSize: this.minSize,
359 maxSize: this.maxSize,
c05f0d50
JB
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.aggregate &&
1305e9a8
JB
362 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
364 workerNodes: this.workerNodes.length,
365 idleWorkerNodes: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
f59e1027 367 workerNode.usage.tasks.executing === 0
a4e07f72
JB
368 ? accumulator + 1
369 : accumulator,
6b27d407
JB
370 0
371 ),
372 busyWorkerNodes: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
f59e1027 374 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
375 0
376 ),
a4e07f72 377 executedTasks: this.workerNodes.reduce(
6b27d407 378 (accumulator, workerNode) =>
f59e1027 379 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
380 0
381 ),
382 executingTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
f59e1027 384 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
385 0
386 ),
daf86646
JB
387 ...(this.opts.enableTasksQueue === true && {
388 queuedTasks: this.workerNodes.reduce(
389 (accumulator, workerNode) =>
390 accumulator + workerNode.usage.tasks.queued,
391 0
392 )
393 }),
394 ...(this.opts.enableTasksQueue === true && {
395 maxQueuedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
398 0
399 )
400 }),
a1763c54
JB
401 ...(this.opts.enableTasksQueue === true && {
402 backPressure: this.hasBackPressure()
403 }),
68cbdc84
JB
404 ...(this.opts.enableTasksQueue === true && {
405 stolenTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.stolen,
408 0
409 )
410 }),
a4e07f72
JB
411 failedTasks: this.workerNodes.reduce(
412 (accumulator, workerNode) =>
f59e1027 413 accumulator + workerNode.usage.tasks.failed,
a4e07f72 414 0
1dcf8b7b
JB
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .runTime.aggregate && {
418 runTime: {
98e72cda 419 minimum: round(
90d6701c 420 min(
98e72cda 421 ...this.workerNodes.map(
041dc05b 422 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 423 )
1dcf8b7b
JB
424 )
425 ),
98e72cda 426 maximum: round(
90d6701c 427 max(
98e72cda 428 ...this.workerNodes.map(
041dc05b 429 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 430 )
1dcf8b7b 431 )
98e72cda 432 ),
3baa0837
JB
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .runTime.average && {
435 average: round(
436 average(
437 this.workerNodes.reduce<number[]>(
438 (accumulator, workerNode) =>
439 accumulator.concat(workerNode.usage.runTime.history),
440 []
441 )
98e72cda 442 )
dc021bcc 443 )
3baa0837 444 }),
98e72cda
JB
445 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
446 .runTime.median && {
447 median: round(
448 median(
3baa0837
JB
449 this.workerNodes.reduce<number[]>(
450 (accumulator, workerNode) =>
451 accumulator.concat(workerNode.usage.runTime.history),
452 []
98e72cda
JB
453 )
454 )
455 )
456 })
1dcf8b7b
JB
457 }
458 }),
459 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 .waitTime.aggregate && {
461 waitTime: {
98e72cda 462 minimum: round(
90d6701c 463 min(
98e72cda 464 ...this.workerNodes.map(
041dc05b 465 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 466 )
1dcf8b7b
JB
467 )
468 ),
98e72cda 469 maximum: round(
90d6701c 470 max(
98e72cda 471 ...this.workerNodes.map(
041dc05b 472 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 473 )
1dcf8b7b 474 )
98e72cda 475 ),
3baa0837
JB
476 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
477 .waitTime.average && {
478 average: round(
479 average(
480 this.workerNodes.reduce<number[]>(
481 (accumulator, workerNode) =>
482 accumulator.concat(workerNode.usage.waitTime.history),
483 []
484 )
98e72cda 485 )
dc021bcc 486 )
3baa0837 487 }),
98e72cda
JB
488 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 .waitTime.median && {
490 median: round(
491 median(
3baa0837
JB
492 this.workerNodes.reduce<number[]>(
493 (accumulator, workerNode) =>
494 accumulator.concat(workerNode.usage.waitTime.history),
495 []
98e72cda
JB
496 )
497 )
498 )
499 })
1dcf8b7b
JB
500 }
501 })
6b27d407
JB
502 }
503 }
08f3f44c 504
aa9eede8
JB
505 /**
506 * The pool readiness boolean status.
507 */
2431bdb4
JB
508 private get ready (): boolean {
509 return (
b97d82d8
JB
510 this.workerNodes.reduce(
511 (accumulator, workerNode) =>
512 !workerNode.info.dynamic && workerNode.info.ready
513 ? accumulator + 1
514 : accumulator,
515 0
516 ) >= this.minSize
2431bdb4
JB
517 )
518 }
519
afe0d5bf 520 /**
aa9eede8 521 * The approximate pool utilization.
afe0d5bf
JB
522 *
523 * @returns The pool utilization.
524 */
525 private get utilization (): number {
8e5ca040 526 const poolTimeCapacity =
fe7d90db 527 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
528 const totalTasksRunTime = this.workerNodes.reduce(
529 (accumulator, workerNode) =>
71514351 530 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
531 0
532 )
533 const totalTasksWaitTime = this.workerNodes.reduce(
534 (accumulator, workerNode) =>
71514351 535 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
536 0
537 )
8e5ca040 538 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
539 }
540
8881ae32 541 /**
aa9eede8 542 * The pool type.
8881ae32
JB
543 *
544 * If it is `'dynamic'`, it provides the `max` property.
545 */
546 protected abstract get type (): PoolType
547
184855e6 548 /**
aa9eede8 549 * The worker type.
184855e6
JB
550 */
551 protected abstract get worker (): WorkerType
552
c2ade475 553 /**
aa9eede8 554 * The pool minimum size.
c2ade475 555 */
8735b4e5
JB
556 protected get minSize (): number {
557 return this.numberOfWorkers
558 }
ff733df7
JB
559
560 /**
aa9eede8 561 * The pool maximum size.
ff733df7 562 */
8735b4e5
JB
563 protected get maxSize (): number {
564 return this.max ?? this.numberOfWorkers
565 }
a35560ba 566
6b813701
JB
567 /**
568 * Checks if the worker id sent in the received message from a worker is valid.
569 *
570 * @param message - The received message.
571 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
572 */
21f710aa 573 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
574 if (message.workerId == null) {
575 throw new Error('Worker message received without worker id')
576 } else if (
21f710aa 577 message.workerId != null &&
aad6fb64 578 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
579 ) {
580 throw new Error(
581 `Worker message received from unknown worker '${message.workerId}'`
582 )
583 }
584 }
585
ffcbbad8 586 /**
f06e48d8 587 * Gets the given worker its worker node key.
ffcbbad8
JB
588 *
589 * @param worker - The worker.
f59e1027 590 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 591 */
aad6fb64 592 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 593 return this.workerNodes.findIndex(
041dc05b 594 workerNode => workerNode.worker === worker
f06e48d8 595 )
bf9549ae
JB
596 }
597
aa9eede8
JB
598 /**
599 * Gets the worker node key given its worker id.
600 *
601 * @param workerId - The worker id.
aad6fb64 602 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 603 */
aad6fb64
JB
604 private getWorkerNodeKeyByWorkerId (workerId: number): number {
605 return this.workerNodes.findIndex(
041dc05b 606 workerNode => workerNode.info.id === workerId
aad6fb64 607 )
aa9eede8
JB
608 }
609
afc003b2 610 /** @inheritDoc */
a35560ba 611 public setWorkerChoiceStrategy (
59219cbb
JB
612 workerChoiceStrategy: WorkerChoiceStrategy,
613 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 614 ): void {
aee46736 615 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 616 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
617 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
618 this.opts.workerChoiceStrategy
619 )
620 if (workerChoiceStrategyOptions != null) {
621 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
622 }
aa9eede8 623 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 624 workerNode.resetUsage()
9edb9717 625 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 626 }
a20f0ba5
JB
627 }
628
629 /** @inheritDoc */
630 public setWorkerChoiceStrategyOptions (
631 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
632 ): void {
0d80593b 633 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
634 this.opts.workerChoiceStrategyOptions = {
635 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
636 ...workerChoiceStrategyOptions
637 }
a20f0ba5
JB
638 this.workerChoiceStrategyContext.setOptions(
639 this.opts.workerChoiceStrategyOptions
a35560ba
S
640 )
641 }
642
a20f0ba5 643 /** @inheritDoc */
8f52842f
JB
644 public enableTasksQueue (
645 enable: boolean,
646 tasksQueueOptions?: TasksQueueOptions
647 ): void {
a20f0ba5 648 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 649 this.flushTasksQueues()
a20f0ba5
JB
650 }
651 this.opts.enableTasksQueue = enable
8f52842f 652 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
653 }
654
655 /** @inheritDoc */
8f52842f 656 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 657 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
658 this.checkValidTasksQueueOptions(tasksQueueOptions)
659 this.opts.tasksQueueOptions =
660 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 661 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 662 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
663 delete this.opts.tasksQueueOptions
664 }
665 }
666
5b49e864 667 private setTasksQueueSize (size: number): void {
20c6f652 668 for (const workerNode of this.workerNodes) {
ff3f866a 669 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
670 }
671 }
672
a20f0ba5
JB
673 private buildTasksQueueOptions (
674 tasksQueueOptions: TasksQueueOptions
675 ): TasksQueueOptions {
676 return {
20c6f652 677 ...{
ff3f866a 678 size: Math.pow(this.maxSize, 2),
20c6f652
JB
679 concurrency: 1
680 },
681 ...tasksQueueOptions
a20f0ba5
JB
682 }
683 }
684
c319c66b
JB
685 /**
686 * Whether the pool is full or not.
687 *
688 * The pool filling boolean status.
689 */
dea903a8
JB
690 protected get full (): boolean {
691 return this.workerNodes.length >= this.maxSize
692 }
c2ade475 693
c319c66b
JB
694 /**
695 * Whether the pool is busy or not.
696 *
697 * The pool busyness boolean status.
698 */
699 protected abstract get busy (): boolean
7c0ba920 700
6c6afb84 701 /**
3d76750a 702 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
703 *
704 * @returns Worker nodes busyness boolean status.
705 */
c2ade475 706 protected internalBusy (): boolean {
3d76750a
JB
707 if (this.opts.enableTasksQueue === true) {
708 return (
709 this.workerNodes.findIndex(
041dc05b 710 workerNode =>
3d76750a
JB
711 workerNode.info.ready &&
712 workerNode.usage.tasks.executing <
713 (this.opts.tasksQueueOptions?.concurrency as number)
714 ) === -1
715 )
716 } else {
717 return (
718 this.workerNodes.findIndex(
041dc05b 719 workerNode =>
3d76750a
JB
720 workerNode.info.ready && workerNode.usage.tasks.executing === 0
721 ) === -1
722 )
723 }
cb70b19d
JB
724 }
725
6703b9f4
JB
726 private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
727 let messagesCount = 0
728 for (const [workerNodeKey] of this.workerNodes.entries()) {
729 this.sendToWorker(workerNodeKey, {
730 ...message,
731 workerId: this.getWorkerInfo(workerNodeKey).id as number
732 })
733 ++messagesCount
734 }
735 return messagesCount
736 }
737
738 /** @inheritDoc */
739 public hasTaskFunction (name: string): boolean {
740 this.sendToWorkers({
741 taskFunctionOperation: 'has',
742 taskFunctionName: name
743 })
744 return true
745 }
746
747 /** @inheritDoc */
748 public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
749 this.sendToWorkers({
750 taskFunctionOperation: 'add',
751 taskFunctionName: name,
752 taskFunction: taskFunction.toString()
753 })
754 return true
755 }
756
757 /** @inheritDoc */
758 public removeTaskFunction (name: string): boolean {
759 this.sendToWorkers({
760 taskFunctionOperation: 'remove',
761 taskFunctionName: name
762 })
763 return true
764 }
765
90d7d101 766 /** @inheritDoc */
6703b9f4 767 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
768 for (const workerNode of this.workerNodes) {
769 if (
6703b9f4
JB
770 Array.isArray(workerNode.info.taskFunctionNames) &&
771 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 772 ) {
6703b9f4 773 return workerNode.info.taskFunctionNames
f2dbbf95 774 }
90d7d101 775 }
f2dbbf95 776 return []
90d7d101
JB
777 }
778
6703b9f4
JB
779 /** @inheritDoc */
780 public setDefaultTaskFunction (name: string): boolean {
781 this.sendToWorkers({
782 taskFunctionOperation: 'default',
783 taskFunctionName: name
784 })
785 return true
786 }
787
375f7504
JB
788 private shallExecuteTask (workerNodeKey: number): boolean {
789 return (
790 this.tasksQueueSize(workerNodeKey) === 0 &&
791 this.workerNodes[workerNodeKey].usage.tasks.executing <
792 (this.opts.tasksQueueOptions?.concurrency as number)
793 )
794 }
795
afc003b2 796 /** @inheritDoc */
7d91a8cd
JB
797 public async execute (
798 data?: Data,
799 name?: string,
800 transferList?: TransferListItem[]
801 ): Promise<Response> {
52b71763 802 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
803 if (!this.started) {
804 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 805 return
15b176e0 806 }
7d91a8cd
JB
807 if (name != null && typeof name !== 'string') {
808 reject(new TypeError('name argument must be a string'))
9d2d0da1 809 return
7d91a8cd 810 }
90d7d101
JB
811 if (
812 name != null &&
813 typeof name === 'string' &&
814 name.trim().length === 0
815 ) {
f58b60b9 816 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 817 return
90d7d101 818 }
b558f6b5
JB
819 if (transferList != null && !Array.isArray(transferList)) {
820 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 821 return
b558f6b5
JB
822 }
823 const timestamp = performance.now()
824 const workerNodeKey = this.chooseWorkerNode()
501aea93 825 const task: Task<Data> = {
52b71763
JB
826 name: name ?? DEFAULT_TASK_NAME,
827 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
828 data: data ?? ({} as Data),
7d91a8cd 829 transferList,
52b71763 830 timestamp,
1a28f967 831 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 832 taskId: randomUUID()
52b71763 833 }
7629bdf1 834 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
835 resolve,
836 reject,
501aea93 837 workerNodeKey
2e81254d 838 })
52b71763 839 if (
4e377863
JB
840 this.opts.enableTasksQueue === false ||
841 (this.opts.enableTasksQueue === true &&
375f7504 842 this.shallExecuteTask(workerNodeKey))
52b71763 843 ) {
501aea93 844 this.executeTask(workerNodeKey, task)
4e377863
JB
845 } else {
846 this.enqueueTask(workerNodeKey, task)
52b71763 847 }
2e81254d 848 })
280c2a77 849 }
c97c7edb 850
afc003b2 851 /** @inheritDoc */
c97c7edb 852 public async destroy (): Promise<void> {
1fbcaa7c 853 await Promise.all(
81c02522 854 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 855 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
856 })
857 )
33e6bb4c 858 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 859 this.started = false
c97c7edb
S
860 }
861
1e3214b6
JB
862 protected async sendKillMessageToWorker (
863 workerNodeKey: number,
864 workerId: number
865 ): Promise<void> {
9edb9717 866 await new Promise<void>((resolve, reject) => {
041dc05b 867 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
868 if (message.kill === 'success') {
869 resolve()
870 } else if (message.kill === 'failure') {
e1af34e6 871 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
872 }
873 })
9edb9717 874 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 875 })
1e3214b6
JB
876 }
877
4a6952ff 878 /**
aa9eede8 879 * Terminates the worker node given its worker node key.
4a6952ff 880 *
aa9eede8 881 * @param workerNodeKey - The worker node key.
4a6952ff 882 */
81c02522 883 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 884
729c563d 885 /**
6677a3d3
JB
886 * Setup hook to execute code before worker nodes are created in the abstract constructor.
887 * Can be overridden.
afc003b2
JB
888 *
889 * @virtual
729c563d 890 */
280c2a77 891 protected setupHook (): void {
965df41c 892 /* Intentionally empty */
280c2a77 893 }
c97c7edb 894
729c563d 895 /**
280c2a77
S
896 * Should return whether the worker is the main worker or not.
897 */
898 protected abstract isMain (): boolean
899
900 /**
2e81254d 901 * Hook executed before the worker task execution.
bf9549ae 902 * Can be overridden.
729c563d 903 *
f06e48d8 904 * @param workerNodeKey - The worker node key.
1c6fe997 905 * @param task - The task to execute.
729c563d 906 */
1c6fe997
JB
907 protected beforeTaskExecutionHook (
908 workerNodeKey: number,
909 task: Task<Data>
910 ): void {
94407def
JB
911 if (this.workerNodes[workerNodeKey]?.usage != null) {
912 const workerUsage = this.workerNodes[workerNodeKey].usage
913 ++workerUsage.tasks.executing
914 this.updateWaitTimeWorkerUsage(workerUsage, task)
915 }
916 if (
917 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
918 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
919 task.name as string
920 ) != null
921 ) {
db0e38ee 922 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 923 workerNodeKey
db0e38ee 924 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
925 ++taskFunctionWorkerUsage.tasks.executing
926 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 927 }
c97c7edb
S
928 }
929
c01733f1 930 /**
2e81254d 931 * Hook executed after the worker task execution.
bf9549ae 932 * Can be overridden.
c01733f1 933 *
501aea93 934 * @param workerNodeKey - The worker node key.
38e795c1 935 * @param message - The received message.
c01733f1 936 */
2e81254d 937 protected afterTaskExecutionHook (
501aea93 938 workerNodeKey: number,
2740a743 939 message: MessageValue<Response>
bf9549ae 940 ): void {
94407def
JB
941 if (this.workerNodes[workerNodeKey]?.usage != null) {
942 const workerUsage = this.workerNodes[workerNodeKey].usage
943 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
944 this.updateRunTimeWorkerUsage(workerUsage, message)
945 this.updateEluWorkerUsage(workerUsage, message)
946 }
947 if (
948 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
949 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 950 message.taskPerformance?.name as string
94407def
JB
951 ) != null
952 ) {
db0e38ee 953 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 954 workerNodeKey
db0e38ee 955 ].getTaskFunctionWorkerUsage(
0628755c 956 message.taskPerformance?.name as string
b558f6b5 957 ) as WorkerUsage
db0e38ee
JB
958 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
959 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
960 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
961 }
962 }
963
db0e38ee
JB
964 /**
965 * Whether the worker node shall update its task function worker usage or not.
966 *
967 * @param workerNodeKey - The worker node key.
968 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
969 */
970 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 971 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 972 return (
94407def 973 workerInfo != null &&
6703b9f4
JB
974 Array.isArray(workerInfo.taskFunctionNames) &&
975 workerInfo.taskFunctionNames.length > 2
b558f6b5 976 )
f1c06930
JB
977 }
978
979 private updateTaskStatisticsWorkerUsage (
980 workerUsage: WorkerUsage,
981 message: MessageValue<Response>
982 ): void {
a4e07f72 983 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
984 if (
985 workerTaskStatistics.executing != null &&
986 workerTaskStatistics.executing > 0
987 ) {
988 --workerTaskStatistics.executing
5bb5be17 989 }
6703b9f4 990 if (message.workerError == null) {
98e72cda
JB
991 ++workerTaskStatistics.executed
992 } else {
a4e07f72 993 ++workerTaskStatistics.failed
2740a743 994 }
f8eb0a2a
JB
995 }
996
a4e07f72
JB
997 private updateRunTimeWorkerUsage (
998 workerUsage: WorkerUsage,
f8eb0a2a
JB
999 message: MessageValue<Response>
1000 ): void {
6703b9f4 1001 if (message.workerError != null) {
dc021bcc
JB
1002 return
1003 }
e4f20deb
JB
1004 updateMeasurementStatistics(
1005 workerUsage.runTime,
1006 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1007 message.taskPerformance?.runTime ?? 0
e4f20deb 1008 )
f8eb0a2a
JB
1009 }
1010
a4e07f72
JB
1011 private updateWaitTimeWorkerUsage (
1012 workerUsage: WorkerUsage,
1c6fe997 1013 task: Task<Data>
f8eb0a2a 1014 ): void {
1c6fe997
JB
1015 const timestamp = performance.now()
1016 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1017 updateMeasurementStatistics(
1018 workerUsage.waitTime,
1019 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1020 taskWaitTime
e4f20deb 1021 )
c01733f1 1022 }
1023
a4e07f72 1024 private updateEluWorkerUsage (
5df69fab 1025 workerUsage: WorkerUsage,
62c15a68
JB
1026 message: MessageValue<Response>
1027 ): void {
6703b9f4 1028 if (message.workerError != null) {
dc021bcc
JB
1029 return
1030 }
008512c7
JB
1031 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1032 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1033 updateMeasurementStatistics(
1034 workerUsage.elu.active,
008512c7 1035 eluTaskStatisticsRequirements,
dc021bcc 1036 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1037 )
1038 updateMeasurementStatistics(
1039 workerUsage.elu.idle,
008512c7 1040 eluTaskStatisticsRequirements,
dc021bcc 1041 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1042 )
008512c7 1043 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1044 if (message.taskPerformance?.elu != null) {
f7510105
JB
1045 if (workerUsage.elu.utilization != null) {
1046 workerUsage.elu.utilization =
1047 (workerUsage.elu.utilization +
1048 message.taskPerformance.elu.utilization) /
1049 2
1050 } else {
1051 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1052 }
62c15a68
JB
1053 }
1054 }
1055 }
1056
280c2a77 1057 /**
f06e48d8 1058 * Chooses a worker node for the next task.
280c2a77 1059 *
6c6afb84 1060 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1061 *
aa9eede8 1062 * @returns The chosen worker node key
280c2a77 1063 */
6c6afb84 1064 private chooseWorkerNode (): number {
930dcf12 1065 if (this.shallCreateDynamicWorker()) {
aa9eede8 1066 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1067 if (
b1aae695 1068 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1069 ) {
aa9eede8 1070 return workerNodeKey
6c6afb84 1071 }
17393ac8 1072 }
930dcf12
JB
1073 return this.workerChoiceStrategyContext.execute()
1074 }
1075
6c6afb84
JB
1076 /**
1077 * Conditions for dynamic worker creation.
1078 *
1079 * @returns Whether to create a dynamic worker or not.
1080 */
1081 private shallCreateDynamicWorker (): boolean {
930dcf12 1082 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1083 }
1084
280c2a77 1085 /**
aa9eede8 1086 * Sends a message to worker given its worker node key.
280c2a77 1087 *
aa9eede8 1088 * @param workerNodeKey - The worker node key.
38e795c1 1089 * @param message - The message.
7d91a8cd 1090 * @param transferList - The optional array of transferable objects.
280c2a77
S
1091 */
1092 protected abstract sendToWorker (
aa9eede8 1093 workerNodeKey: number,
7d91a8cd
JB
1094 message: MessageValue<Data>,
1095 transferList?: TransferListItem[]
280c2a77
S
1096 ): void
1097
729c563d 1098 /**
41344292 1099 * Creates a new worker.
6c6afb84
JB
1100 *
1101 * @returns Newly created worker.
729c563d 1102 */
280c2a77 1103 protected abstract createWorker (): Worker
c97c7edb 1104
4a6952ff 1105 /**
aa9eede8 1106 * Creates a new, completely set up worker node.
4a6952ff 1107 *
aa9eede8 1108 * @returns New, completely set up worker node key.
4a6952ff 1109 */
aa9eede8 1110 protected createAndSetupWorkerNode (): number {
bdacc2d2 1111 const worker = this.createWorker()
280c2a77 1112
fd04474e 1113 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1114 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1115 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1116 worker.on('error', error => {
aad6fb64 1117 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1118 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1119 workerInfo.ready = false
0dc838e3 1120 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1121 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1122 if (
1123 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1124 this.started &&
1125 !this.starting
15b176e0 1126 ) {
9b106837 1127 if (workerInfo.dynamic) {
aa9eede8 1128 this.createAndSetupDynamicWorkerNode()
8a1260a3 1129 } else {
aa9eede8 1130 this.createAndSetupWorkerNode()
8a1260a3 1131 }
5baee0d7 1132 }
19dbc45b 1133 if (this.opts.enableTasksQueue === true) {
9b106837 1134 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1135 }
5baee0d7 1136 })
a35560ba 1137 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1138 worker.once('exit', () => {
f06e48d8 1139 this.removeWorkerNode(worker)
a974afa6 1140 })
280c2a77 1141
aa9eede8 1142 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1143
aa9eede8 1144 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1145
aa9eede8 1146 return workerNodeKey
c97c7edb 1147 }
be0676b3 1148
930dcf12 1149 /**
aa9eede8 1150 * Creates a new, completely set up dynamic worker node.
930dcf12 1151 *
aa9eede8 1152 * @returns New, completely set up dynamic worker node key.
930dcf12 1153 */
aa9eede8
JB
1154 protected createAndSetupDynamicWorkerNode (): number {
1155 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1156 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1157 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1158 message.workerId
aad6fb64 1159 )
aa9eede8 1160 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1161 // Kill message received from worker
930dcf12
JB
1162 if (
1163 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1164 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1165 ((this.opts.enableTasksQueue === false &&
aa9eede8 1166 workerUsage.tasks.executing === 0) ||
7b56f532 1167 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1168 workerUsage.tasks.executing === 0 &&
1169 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1170 ) {
041dc05b 1171 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1172 this.emitter?.emit(PoolEvents.error, error)
1173 })
930dcf12
JB
1174 }
1175 })
46b0bb09 1176 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1177 this.sendToWorker(workerNodeKey, {
b0a4db63 1178 checkActive: true,
21f710aa
JB
1179 workerId: workerInfo.id as number
1180 })
b5e113f6 1181 workerInfo.dynamic = true
b1aae695
JB
1182 if (
1183 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1184 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1185 ) {
b5e113f6
JB
1186 workerInfo.ready = true
1187 }
33e6bb4c 1188 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1189 return workerNodeKey
930dcf12
JB
1190 }
1191
a2ed5053 1192 /**
aa9eede8 1193 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1194 *
aa9eede8 1195 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1196 * @param listener - The message listener callback.
1197 */
85aeb3f3
JB
1198 protected abstract registerWorkerMessageListener<
1199 Message extends Data | Response
aa9eede8
JB
1200 >(
1201 workerNodeKey: number,
1202 listener: (message: MessageValue<Message>) => void
1203 ): void
a2ed5053
JB
1204
1205 /**
aa9eede8 1206 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1207 * Can be overridden.
1208 *
aa9eede8 1209 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1210 */
aa9eede8 1211 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1212 // Listen to worker messages.
aa9eede8 1213 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1214 // Send the startup message to worker.
aa9eede8 1215 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1216 // Send the statistics message to worker.
1217 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1218 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1219 this.workerNodes[workerNodeKey].onEmptyQueue =
1220 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1221 this.workerNodes[workerNodeKey].onBackPressure =
1222 this.tasksStealingOnBackPressure.bind(this)
1223 }
d2c73f82
JB
1224 }
1225
85aeb3f3 1226 /**
aa9eede8
JB
1227 * Sends the startup message to worker given its worker node key.
1228 *
1229 * @param workerNodeKey - The worker node key.
1230 */
1231 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1232
1233 /**
9edb9717 1234 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1235 *
aa9eede8 1236 * @param workerNodeKey - The worker node key.
85aeb3f3 1237 */
9edb9717 1238 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1239 this.sendToWorker(workerNodeKey, {
1240 statistics: {
1241 runTime:
1242 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1243 .runTime.aggregate,
1244 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1245 .elu.aggregate
1246 },
46b0bb09 1247 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1248 })
1249 }
a2ed5053
JB
1250
1251 private redistributeQueuedTasks (workerNodeKey: number): void {
1252 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1253 const destinationWorkerNodeKey = this.workerNodes.reduce(
1254 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1255 return workerNode.info.ready &&
1256 workerNode.usage.tasks.queued <
1257 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1258 ? workerNodeKey
1259 : minWorkerNodeKey
1260 },
1261 0
1262 )
3f690f25
JB
1263 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1264 const task = {
1265 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1266 workerId: destinationWorkerNode.info.id as number
1267 }
1268 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1269 this.executeTask(destinationWorkerNodeKey, task)
1270 } else {
1271 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1272 }
1273 }
1274 }
1275
b1838604
JB
1276 private updateTaskStolenStatisticsWorkerUsage (
1277 workerNodeKey: number,
b1838604
JB
1278 taskName: string
1279 ): void {
1a880eca 1280 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1281 if (workerNode?.usage != null) {
1282 ++workerNode.usage.tasks.stolen
1283 }
1284 if (
1285 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1286 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1287 ) {
1288 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1289 taskName
1290 ) as WorkerUsage
1291 ++taskFunctionWorkerUsage.tasks.stolen
1292 }
1293 }
1294
dd951876 1295 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1296 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1297 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1298 const workerNodes = this.workerNodes
a6b3272b 1299 .slice()
dd951876
JB
1300 .sort(
1301 (workerNodeA, workerNodeB) =>
1302 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1303 )
f201a0cd 1304 const sourceWorkerNode = workerNodes.find(
041dc05b 1305 workerNode =>
f201a0cd
JB
1306 workerNode.info.ready &&
1307 workerNode.info.id !== workerId &&
1308 workerNode.usage.tasks.queued > 0
1309 )
1310 if (sourceWorkerNode != null) {
1311 const task = {
1312 ...(sourceWorkerNode.popTask() as Task<Data>),
1313 workerId: destinationWorkerNode.info.id as number
0bc68267 1314 }
f201a0cd
JB
1315 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1316 this.executeTask(destinationWorkerNodeKey, task)
1317 } else {
1318 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1319 }
f201a0cd
JB
1320 this.updateTaskStolenStatisticsWorkerUsage(
1321 destinationWorkerNodeKey,
1322 task.name as string
1323 )
72695f86
JB
1324 }
1325 }
1326
1327 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1328 const sizeOffset = 1
1329 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1330 return
1331 }
72695f86
JB
1332 const sourceWorkerNode =
1333 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1334 const workerNodes = this.workerNodes
a6b3272b 1335 .slice()
72695f86
JB
1336 .sort(
1337 (workerNodeA, workerNodeB) =>
1338 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1339 )
1340 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1341 if (
0bc68267 1342 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1343 workerNode.info.ready &&
1344 workerNode.info.id !== workerId &&
0bc68267 1345 workerNode.usage.tasks.queued <
f778c355 1346 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1347 ) {
dd951876
JB
1348 const task = {
1349 ...(sourceWorkerNode.popTask() as Task<Data>),
1350 workerId: workerNode.info.id as number
1351 }
375f7504 1352 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1353 this.executeTask(workerNodeKey, task)
4de3d785 1354 } else {
dd951876 1355 this.enqueueTask(workerNodeKey, task)
4de3d785 1356 }
b1838604
JB
1357 this.updateTaskStolenStatisticsWorkerUsage(
1358 workerNodeKey,
b1838604
JB
1359 task.name as string
1360 )
10ecf8fd 1361 }
a2ed5053
JB
1362 }
1363 }
1364
be0676b3 1365 /**
aa9eede8 1366 * This method is the listener registered for each worker message.
be0676b3 1367 *
bdacc2d2 1368 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1369 */
1370 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1371 return message => {
21f710aa 1372 this.checkMessageWorkerId(message)
6703b9f4 1373 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1374 // Worker ready response received from worker
10e2aa7e 1375 this.handleWorkerReadyResponse(message)
7629bdf1 1376 } else if (message.taskId != null) {
81c02522 1377 // Task execution response received from worker
6b272951 1378 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1379 } else if (message.taskFunctionNames != null) {
1380 // Task function names message received from worker
46b0bb09
JB
1381 this.getWorkerInfo(
1382 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4
JB
1383 ).taskFunctionNames = message.taskFunctionNames
1384 } else if (message.taskFunctionOperation != null) {
1385 // Task function operation response received from worker
6b272951
JB
1386 }
1387 }
1388 }
1389
10e2aa7e 1390 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1391 if (message.ready === false) {
1392 throw new Error(`Worker ${message.workerId} failed to initialize`)
1393 }
a5d15204 1394 const workerInfo = this.getWorkerInfo(
aad6fb64 1395 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1396 )
a5d15204 1397 workerInfo.ready = message.ready as boolean
6703b9f4 1398 workerInfo.taskFunctionNames = message.taskFunctionNames
2431bdb4
JB
1399 if (this.emitter != null && this.ready) {
1400 this.emitter.emit(PoolEvents.ready, this.info)
1401 }
6b272951
JB
1402 }
1403
1404 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1405 const { taskId, workerError, data } = message
5441aea6 1406 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1407 if (promiseResponse != null) {
6703b9f4
JB
1408 if (workerError != null) {
1409 this.emitter?.emit(PoolEvents.taskError, workerError)
1410 promiseResponse.reject(workerError.message)
6b272951 1411 } else {
5441aea6 1412 promiseResponse.resolve(data as Response)
6b272951 1413 }
501aea93
JB
1414 const workerNodeKey = promiseResponse.workerNodeKey
1415 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1416 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1417 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1418 if (
1419 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1420 this.tasksQueueSize(workerNodeKey) > 0 &&
1421 this.workerNodes[workerNodeKey].usage.tasks.executing <
1422 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1423 ) {
1424 this.executeTask(
1425 workerNodeKey,
1426 this.dequeueTask(workerNodeKey) as Task<Data>
1427 )
be0676b3
APA
1428 }
1429 }
be0676b3 1430 }
7c0ba920 1431
a1763c54 1432 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1433 if (this.busy) {
1434 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1435 }
1436 }
1437
1438 private checkAndEmitTaskQueuingEvents (): void {
1439 if (this.hasBackPressure()) {
1440 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1441 }
1442 }
1443
33e6bb4c
JB
1444 private checkAndEmitDynamicWorkerCreationEvents (): void {
1445 if (this.type === PoolTypes.dynamic) {
1446 if (this.full) {
1447 this.emitter?.emit(PoolEvents.full, this.info)
1448 }
1449 }
1450 }
1451
8a1260a3 1452 /**
aa9eede8 1453 * Gets the worker information given its worker node key.
8a1260a3
JB
1454 *
1455 * @param workerNodeKey - The worker node key.
3f09ed9f 1456 * @returns The worker information.
8a1260a3 1457 */
46b0bb09
JB
1458 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1459 return this.workerNodes[workerNodeKey].info
e221309a
JB
1460 }
1461
a05c10de 1462 /**
b0a4db63 1463 * Adds the given worker in the pool worker nodes.
ea7a90d3 1464 *
38e795c1 1465 * @param worker - The worker.
aa9eede8
JB
1466 * @returns The added worker node key.
1467 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1468 */
b0a4db63 1469 private addWorkerNode (worker: Worker): number {
671d5154
JB
1470 const workerNode = new WorkerNode<Worker, Data>(
1471 worker,
ff3f866a 1472 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1473 )
b97d82d8 1474 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1475 if (this.starting) {
1476 workerNode.info.ready = true
1477 }
aa9eede8 1478 this.workerNodes.push(workerNode)
aad6fb64 1479 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1480 if (workerNodeKey === -1) {
86ed0598 1481 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1482 }
1483 return workerNodeKey
ea7a90d3 1484 }
c923ce56 1485
51fe3d3c 1486 /**
f06e48d8 1487 * Removes the given worker from the pool worker nodes.
51fe3d3c 1488 *
f06e48d8 1489 * @param worker - The worker.
51fe3d3c 1490 */
416fd65c 1491 private removeWorkerNode (worker: Worker): void {
aad6fb64 1492 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1493 if (workerNodeKey !== -1) {
1494 this.workerNodes.splice(workerNodeKey, 1)
1495 this.workerChoiceStrategyContext.remove(workerNodeKey)
1496 }
51fe3d3c 1497 }
adc3c320 1498
e2b31e32
JB
1499 /** @inheritDoc */
1500 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1501 return (
e2b31e32
JB
1502 this.opts.enableTasksQueue === true &&
1503 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1504 )
1505 }
1506
1507 private hasBackPressure (): boolean {
1508 return (
1509 this.opts.enableTasksQueue === true &&
1510 this.workerNodes.findIndex(
041dc05b 1511 workerNode => !workerNode.hasBackPressure()
a1763c54 1512 ) === -1
9e844245 1513 )
e2b31e32
JB
1514 }
1515
b0a4db63 1516 /**
aa9eede8 1517 * Executes the given task on the worker given its worker node key.
b0a4db63 1518 *
aa9eede8 1519 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1520 * @param task - The task to execute.
1521 */
2e81254d 1522 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1523 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1524 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1525 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1526 }
1527
f9f00b5f 1528 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1529 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1530 this.checkAndEmitTaskQueuingEvents()
1531 return tasksQueueSize
adc3c320
JB
1532 }
1533
416fd65c 1534 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1535 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1536 }
1537
416fd65c 1538 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1539 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1540 }
1541
81c02522 1542 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1543 while (this.tasksQueueSize(workerNodeKey) > 0) {
1544 this.executeTask(
1545 workerNodeKey,
1546 this.dequeueTask(workerNodeKey) as Task<Data>
1547 )
ff733df7 1548 }
4b628b48 1549 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1550 }
1551
ef41a6e6
JB
1552 private flushTasksQueues (): void {
1553 for (const [workerNodeKey] of this.workerNodes.entries()) {
1554 this.flushTasksQueue(workerNodeKey)
1555 }
1556 }
c97c7edb 1557}