Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
c4855468 24import {
65d7a1c9 25 type IPool,
7c5a1080 26 PoolEmitter,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
bbfa38a2
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
23ccf9d7 51
729c563d 52/**
ea7a90d3 53 * Base class that implements some shared logic for all poolifier pools.
729c563d 54 *
38e795c1 55 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 58 */
c97c7edb 59export abstract class AbstractPool<
f06e48d8 60 Worker extends IWorker,
d3c8a1a8
S
61 Data = unknown,
62 Response = unknown
c4855468 63> implements IPool<Worker, Data, Response> {
afc003b2 64 /** @inheritDoc */
4b628b48 65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 66
afc003b2 67 /** @inheritDoc */
7c0ba920
JB
68 public readonly emitter?: PoolEmitter
69
be0676b3 70 /**
192566ec 71 * The task execution response promise map:
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
15b176e0
JB
94 /**
95 * Whether the pool is started or not.
96 */
97 private started: boolean
47352846
JB
98 /**
99 * Whether the pool is starting or not.
100 */
101 private starting: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
47352846 147 this.started = false
075e51d1 148 this.starting = false
47352846
JB
149 if (this.opts.startWorkers === true) {
150 this.start()
151 }
afe0d5bf
JB
152
153 this.startTimestamp = performance.now()
c97c7edb
S
154 }
155
a35560ba 156 private checkFilePath (filePath: string): void {
ffcbbad8
JB
157 if (
158 filePath == null ||
3d6dd312 159 typeof filePath !== 'string' ||
ffcbbad8
JB
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
c510fea7
APA
162 throw new Error('Please specify a file with a worker implementation')
163 }
3d6dd312
JB
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
c510fea7
APA
167 }
168
8d3782fa
JB
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
78cea37e 174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 175 throw new TypeError(
0d80593b 176 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
177 )
178 } else if (numberOfWorkers < 0) {
473c717a 179 throw new RangeError(
8d3782fa
JB
180 'Cannot instantiate a pool with a negative number of workers'
181 )
6b27d407 182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 188 if (this.type === PoolTypes.dynamic) {
a5ed75b7 189 if (max == null) {
e695d66f 190 throw new TypeError(
a5ed75b7
JB
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
079de991
JB
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
b97d82d8 201 } else if (max === 0) {
079de991 202 throw new RangeError(
d640b48b 203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
8d3782fa
JB
210 }
211 }
212
7c0ba920 213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 214 if (isPlainObject(opts)) {
47352846 215 this.opts.startWorkers = opts.startWorkers ?? true
0d80593b
JB
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
49be33fe
JB
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
1f68cede 226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 239 }
aee46736
JB
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 246 throw new Error(
aee46736 247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
248 )
249 }
7c0ba920
JB
250 }
251
0d80593b
JB
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
8990357d 260 if (
8c0b113f
JB
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
263 ) {
264 throw new TypeError(
8c0b113f 265 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
266 )
267 }
268 if (
8c0b113f
JB
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
8990357d
JB
271 ) {
272 throw new RangeError(
8c0b113f 273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
274 )
275 }
49be33fe
JB
276 if (
277 workerChoiceStrategyOptions.weights != null &&
6b27d407 278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
f0d7f803
JB
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
0d80593b
JB
294 }
295
a20f0ba5 296 private checkValidTasksQueueOptions (
76d91ea0 297 tasksQueueOptions: TasksQueueOptions
a20f0ba5 298 ): void {
0d80593b
JB
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
f0d7f803 302 if (
b7d085c4
JB
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
305 ) {
306 throw new TypeError(
20c6f652 307 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
308 )
309 }
310 if (
b7d085c4
JB
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
f0d7f803 313 ) {
e695d66f 314 throw new RangeError(
b7d085c4 315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
316 )
317 }
20c6f652 318 if (
b7d085c4
JB
319 tasksQueueOptions?.size != null &&
320 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 321 ) {
ff3f866a 322 throw new TypeError(
68dbcdc0 323 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
324 )
325 }
b7d085c4 326 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 327 throw new RangeError(
b7d085c4 328 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
329 )
330 }
331 }
332
08f3f44c 333 /** @inheritDoc */
6b27d407
JB
334 public get info (): PoolInfo {
335 return {
23ccf9d7 336 version,
6b27d407 337 type: this.type,
184855e6 338 worker: this.worker,
47352846 339 started: this.started,
2431bdb4
JB
340 ready: this.ready,
341 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
342 minSize: this.minSize,
343 maxSize: this.maxSize,
c05f0d50
JB
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate &&
1305e9a8
JB
346 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
348 workerNodes: this.workerNodes.length,
349 idleWorkerNodes: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
f59e1027 351 workerNode.usage.tasks.executing === 0
a4e07f72
JB
352 ? accumulator + 1
353 : accumulator,
6b27d407
JB
354 0
355 ),
356 busyWorkerNodes: this.workerNodes.reduce(
357 (accumulator, workerNode) =>
f59e1027 358 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
359 0
360 ),
a4e07f72 361 executedTasks: this.workerNodes.reduce(
6b27d407 362 (accumulator, workerNode) =>
f59e1027 363 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
364 0
365 ),
366 executingTasks: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
f59e1027 368 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
369 0
370 ),
daf86646
JB
371 ...(this.opts.enableTasksQueue === true && {
372 queuedTasks: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + workerNode.usage.tasks.queued,
375 0
376 )
377 }),
378 ...(this.opts.enableTasksQueue === true && {
379 maxQueuedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
382 0
383 )
384 }),
a1763c54
JB
385 ...(this.opts.enableTasksQueue === true && {
386 backPressure: this.hasBackPressure()
387 }),
68cbdc84
JB
388 ...(this.opts.enableTasksQueue === true && {
389 stolenTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + workerNode.usage.tasks.stolen,
392 0
393 )
394 }),
a4e07f72
JB
395 failedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
f59e1027 397 accumulator + workerNode.usage.tasks.failed,
a4e07f72 398 0
1dcf8b7b
JB
399 ),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .runTime.aggregate && {
402 runTime: {
98e72cda 403 minimum: round(
90d6701c 404 min(
98e72cda 405 ...this.workerNodes.map(
041dc05b 406 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 407 )
1dcf8b7b
JB
408 )
409 ),
98e72cda 410 maximum: round(
90d6701c 411 max(
98e72cda 412 ...this.workerNodes.map(
041dc05b 413 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 414 )
1dcf8b7b 415 )
98e72cda 416 ),
3baa0837
JB
417 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 .runTime.average && {
419 average: round(
420 average(
421 this.workerNodes.reduce<number[]>(
422 (accumulator, workerNode) =>
423 accumulator.concat(workerNode.usage.runTime.history),
424 []
425 )
98e72cda 426 )
dc021bcc 427 )
3baa0837 428 }),
98e72cda
JB
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .runTime.median && {
431 median: round(
432 median(
3baa0837
JB
433 this.workerNodes.reduce<number[]>(
434 (accumulator, workerNode) =>
435 accumulator.concat(workerNode.usage.runTime.history),
436 []
98e72cda
JB
437 )
438 )
439 )
440 })
1dcf8b7b
JB
441 }
442 }),
443 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 .waitTime.aggregate && {
445 waitTime: {
98e72cda 446 minimum: round(
90d6701c 447 min(
98e72cda 448 ...this.workerNodes.map(
041dc05b 449 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 450 )
1dcf8b7b
JB
451 )
452 ),
98e72cda 453 maximum: round(
90d6701c 454 max(
98e72cda 455 ...this.workerNodes.map(
041dc05b 456 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 457 )
1dcf8b7b 458 )
98e72cda 459 ),
3baa0837
JB
460 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 .waitTime.average && {
462 average: round(
463 average(
464 this.workerNodes.reduce<number[]>(
465 (accumulator, workerNode) =>
466 accumulator.concat(workerNode.usage.waitTime.history),
467 []
468 )
98e72cda 469 )
dc021bcc 470 )
3baa0837 471 }),
98e72cda
JB
472 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
473 .waitTime.median && {
474 median: round(
475 median(
3baa0837
JB
476 this.workerNodes.reduce<number[]>(
477 (accumulator, workerNode) =>
478 accumulator.concat(workerNode.usage.waitTime.history),
479 []
98e72cda
JB
480 )
481 )
482 )
483 })
1dcf8b7b
JB
484 }
485 })
6b27d407
JB
486 }
487 }
08f3f44c 488
aa9eede8
JB
489 /**
490 * The pool readiness boolean status.
491 */
2431bdb4
JB
492 private get ready (): boolean {
493 return (
b97d82d8
JB
494 this.workerNodes.reduce(
495 (accumulator, workerNode) =>
496 !workerNode.info.dynamic && workerNode.info.ready
497 ? accumulator + 1
498 : accumulator,
499 0
500 ) >= this.minSize
2431bdb4
JB
501 )
502 }
503
afe0d5bf 504 /**
aa9eede8 505 * The approximate pool utilization.
afe0d5bf
JB
506 *
507 * @returns The pool utilization.
508 */
509 private get utilization (): number {
8e5ca040 510 const poolTimeCapacity =
fe7d90db 511 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
512 const totalTasksRunTime = this.workerNodes.reduce(
513 (accumulator, workerNode) =>
71514351 514 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
515 0
516 )
517 const totalTasksWaitTime = this.workerNodes.reduce(
518 (accumulator, workerNode) =>
71514351 519 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
520 0
521 )
8e5ca040 522 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
523 }
524
8881ae32 525 /**
aa9eede8 526 * The pool type.
8881ae32
JB
527 *
528 * If it is `'dynamic'`, it provides the `max` property.
529 */
530 protected abstract get type (): PoolType
531
184855e6 532 /**
aa9eede8 533 * The worker type.
184855e6
JB
534 */
535 protected abstract get worker (): WorkerType
536
c2ade475 537 /**
aa9eede8 538 * The pool minimum size.
c2ade475 539 */
8735b4e5
JB
540 protected get minSize (): number {
541 return this.numberOfWorkers
542 }
ff733df7
JB
543
544 /**
aa9eede8 545 * The pool maximum size.
ff733df7 546 */
8735b4e5
JB
547 protected get maxSize (): number {
548 return this.max ?? this.numberOfWorkers
549 }
a35560ba 550
6b813701
JB
551 /**
552 * Checks if the worker id sent in the received message from a worker is valid.
553 *
554 * @param message - The received message.
555 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
556 */
21f710aa 557 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
558 if (message.workerId == null) {
559 throw new Error('Worker message received without worker id')
560 } else if (
21f710aa 561 message.workerId != null &&
aad6fb64 562 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
563 ) {
564 throw new Error(
565 `Worker message received from unknown worker '${message.workerId}'`
566 )
567 }
568 }
569
ffcbbad8 570 /**
f06e48d8 571 * Gets the given worker its worker node key.
ffcbbad8
JB
572 *
573 * @param worker - The worker.
f59e1027 574 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 575 */
aad6fb64 576 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 577 return this.workerNodes.findIndex(
041dc05b 578 workerNode => workerNode.worker === worker
f06e48d8 579 )
bf9549ae
JB
580 }
581
aa9eede8
JB
582 /**
583 * Gets the worker node key given its worker id.
584 *
585 * @param workerId - The worker id.
aad6fb64 586 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 587 */
aad6fb64
JB
588 private getWorkerNodeKeyByWorkerId (workerId: number): number {
589 return this.workerNodes.findIndex(
041dc05b 590 workerNode => workerNode.info.id === workerId
aad6fb64 591 )
aa9eede8
JB
592 }
593
afc003b2 594 /** @inheritDoc */
a35560ba 595 public setWorkerChoiceStrategy (
59219cbb
JB
596 workerChoiceStrategy: WorkerChoiceStrategy,
597 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 598 ): void {
aee46736 599 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 600 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
601 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
602 this.opts.workerChoiceStrategy
603 )
604 if (workerChoiceStrategyOptions != null) {
605 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
606 }
aa9eede8 607 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 608 workerNode.resetUsage()
9edb9717 609 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 610 }
a20f0ba5
JB
611 }
612
613 /** @inheritDoc */
614 public setWorkerChoiceStrategyOptions (
615 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
616 ): void {
0d80593b 617 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
618 this.opts.workerChoiceStrategyOptions = {
619 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
620 ...workerChoiceStrategyOptions
621 }
a20f0ba5
JB
622 this.workerChoiceStrategyContext.setOptions(
623 this.opts.workerChoiceStrategyOptions
a35560ba
S
624 )
625 }
626
a20f0ba5 627 /** @inheritDoc */
8f52842f
JB
628 public enableTasksQueue (
629 enable: boolean,
630 tasksQueueOptions?: TasksQueueOptions
631 ): void {
a20f0ba5 632 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
633 this.unsetTaskStealing()
634 this.unsetTasksStealingOnBackPressure()
ef41a6e6 635 this.flushTasksQueues()
a20f0ba5
JB
636 }
637 this.opts.enableTasksQueue = enable
8f52842f 638 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
639 }
640
641 /** @inheritDoc */
8f52842f 642 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 643 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
644 this.checkValidTasksQueueOptions(tasksQueueOptions)
645 this.opts.tasksQueueOptions =
646 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 647 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
648 if (this.opts.tasksQueueOptions.taskStealing === true) {
649 this.setTaskStealing()
650 } else {
651 this.unsetTaskStealing()
652 }
653 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
654 this.setTasksStealingOnBackPressure()
655 } else {
656 this.unsetTasksStealingOnBackPressure()
657 }
5baee0d7 658 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
659 delete this.opts.tasksQueueOptions
660 }
661 }
662
5b49e864 663 private setTasksQueueSize (size: number): void {
20c6f652 664 for (const workerNode of this.workerNodes) {
ff3f866a 665 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
666 }
667 }
668
d6ca1416
JB
669 private setTaskStealing (): void {
670 for (const [workerNodeKey] of this.workerNodes.entries()) {
671 this.workerNodes[workerNodeKey].onEmptyQueue =
672 this.taskStealingOnEmptyQueue.bind(this)
673 }
674 }
675
676 private unsetTaskStealing (): void {
677 for (const [workerNodeKey] of this.workerNodes.entries()) {
678 delete this.workerNodes[workerNodeKey].onEmptyQueue
679 }
680 }
681
682 private setTasksStealingOnBackPressure (): void {
683 for (const [workerNodeKey] of this.workerNodes.entries()) {
684 this.workerNodes[workerNodeKey].onBackPressure =
685 this.tasksStealingOnBackPressure.bind(this)
686 }
687 }
688
689 private unsetTasksStealingOnBackPressure (): void {
690 for (const [workerNodeKey] of this.workerNodes.entries()) {
691 delete this.workerNodes[workerNodeKey].onBackPressure
692 }
693 }
694
a20f0ba5
JB
695 private buildTasksQueueOptions (
696 tasksQueueOptions: TasksQueueOptions
697 ): TasksQueueOptions {
698 return {
20c6f652 699 ...{
ff3f866a 700 size: Math.pow(this.maxSize, 2),
47352846 701 concurrency: 1,
dbd73092 702 taskStealing: true,
47352846 703 tasksStealingOnBackPressure: true
20c6f652
JB
704 },
705 ...tasksQueueOptions
a20f0ba5
JB
706 }
707 }
708
c319c66b
JB
709 /**
710 * Whether the pool is full or not.
711 *
712 * The pool filling boolean status.
713 */
dea903a8
JB
714 protected get full (): boolean {
715 return this.workerNodes.length >= this.maxSize
716 }
c2ade475 717
c319c66b
JB
718 /**
719 * Whether the pool is busy or not.
720 *
721 * The pool busyness boolean status.
722 */
723 protected abstract get busy (): boolean
7c0ba920 724
6c6afb84 725 /**
3d76750a 726 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
727 *
728 * @returns Worker nodes busyness boolean status.
729 */
c2ade475 730 protected internalBusy (): boolean {
3d76750a
JB
731 if (this.opts.enableTasksQueue === true) {
732 return (
733 this.workerNodes.findIndex(
041dc05b 734 workerNode =>
3d76750a
JB
735 workerNode.info.ready &&
736 workerNode.usage.tasks.executing <
737 (this.opts.tasksQueueOptions?.concurrency as number)
738 ) === -1
739 )
3d76750a 740 }
9d21ee7f
JB
741 return (
742 this.workerNodes.findIndex(
743 workerNode =>
744 workerNode.info.ready && workerNode.usage.tasks.executing === 0
745 ) === -1
746 )
cb70b19d
JB
747 }
748
90d7d101
JB
749 /** @inheritDoc */
750 public listTaskFunctions (): string[] {
f2dbbf95
JB
751 for (const workerNode of this.workerNodes) {
752 if (
753 Array.isArray(workerNode.info.taskFunctions) &&
754 workerNode.info.taskFunctions.length > 0
755 ) {
756 return workerNode.info.taskFunctions
757 }
90d7d101 758 }
f2dbbf95 759 return []
90d7d101
JB
760 }
761
375f7504
JB
762 private shallExecuteTask (workerNodeKey: number): boolean {
763 return (
764 this.tasksQueueSize(workerNodeKey) === 0 &&
765 this.workerNodes[workerNodeKey].usage.tasks.executing <
766 (this.opts.tasksQueueOptions?.concurrency as number)
767 )
768 }
769
afc003b2 770 /** @inheritDoc */
7d91a8cd
JB
771 public async execute (
772 data?: Data,
773 name?: string,
774 transferList?: TransferListItem[]
775 ): Promise<Response> {
52b71763 776 return await new Promise<Response>((resolve, reject) => {
15b176e0 777 if (!this.started) {
47352846 778 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 779 return
15b176e0 780 }
7d91a8cd
JB
781 if (name != null && typeof name !== 'string') {
782 reject(new TypeError('name argument must be a string'))
9d2d0da1 783 return
7d91a8cd 784 }
90d7d101
JB
785 if (
786 name != null &&
787 typeof name === 'string' &&
788 name.trim().length === 0
789 ) {
f58b60b9 790 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 791 return
90d7d101 792 }
b558f6b5
JB
793 if (transferList != null && !Array.isArray(transferList)) {
794 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 795 return
b558f6b5
JB
796 }
797 const timestamp = performance.now()
798 const workerNodeKey = this.chooseWorkerNode()
501aea93 799 const task: Task<Data> = {
52b71763
JB
800 name: name ?? DEFAULT_TASK_NAME,
801 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
802 data: data ?? ({} as Data),
7d91a8cd 803 transferList,
52b71763 804 timestamp,
1a28f967 805 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 806 taskId: randomUUID()
52b71763 807 }
7629bdf1 808 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
809 resolve,
810 reject,
501aea93 811 workerNodeKey
2e81254d 812 })
52b71763 813 if (
4e377863
JB
814 this.opts.enableTasksQueue === false ||
815 (this.opts.enableTasksQueue === true &&
375f7504 816 this.shallExecuteTask(workerNodeKey))
52b71763 817 ) {
501aea93 818 this.executeTask(workerNodeKey, task)
4e377863
JB
819 } else {
820 this.enqueueTask(workerNodeKey, task)
52b71763 821 }
2e81254d 822 })
280c2a77 823 }
c97c7edb 824
47352846
JB
825 /** @inheritdoc */
826 public start (): void {
827 this.starting = true
828 while (
829 this.workerNodes.reduce(
830 (accumulator, workerNode) =>
831 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
832 0
833 ) < this.numberOfWorkers
834 ) {
835 this.createAndSetupWorkerNode()
836 }
837 this.starting = false
838 this.started = true
839 }
840
afc003b2 841 /** @inheritDoc */
c97c7edb 842 public async destroy (): Promise<void> {
1fbcaa7c 843 await Promise.all(
81c02522 844 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 845 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
846 })
847 )
33e6bb4c 848 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 849 this.started = false
c97c7edb
S
850 }
851
1e3214b6
JB
852 protected async sendKillMessageToWorker (
853 workerNodeKey: number,
854 workerId: number
855 ): Promise<void> {
9edb9717 856 await new Promise<void>((resolve, reject) => {
041dc05b 857 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
858 if (message.kill === 'success') {
859 resolve()
860 } else if (message.kill === 'failure') {
e1af34e6 861 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
862 }
863 })
9edb9717 864 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 865 })
1e3214b6
JB
866 }
867
4a6952ff 868 /**
aa9eede8 869 * Terminates the worker node given its worker node key.
4a6952ff 870 *
aa9eede8 871 * @param workerNodeKey - The worker node key.
4a6952ff 872 */
81c02522 873 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 874
729c563d 875 /**
6677a3d3
JB
876 * Setup hook to execute code before worker nodes are created in the abstract constructor.
877 * Can be overridden.
afc003b2
JB
878 *
879 * @virtual
729c563d 880 */
280c2a77 881 protected setupHook (): void {
965df41c 882 /* Intentionally empty */
280c2a77 883 }
c97c7edb 884
729c563d 885 /**
280c2a77
S
886 * Should return whether the worker is the main worker or not.
887 */
888 protected abstract isMain (): boolean
889
890 /**
2e81254d 891 * Hook executed before the worker task execution.
bf9549ae 892 * Can be overridden.
729c563d 893 *
f06e48d8 894 * @param workerNodeKey - The worker node key.
1c6fe997 895 * @param task - The task to execute.
729c563d 896 */
1c6fe997
JB
897 protected beforeTaskExecutionHook (
898 workerNodeKey: number,
899 task: Task<Data>
900 ): void {
94407def
JB
901 if (this.workerNodes[workerNodeKey]?.usage != null) {
902 const workerUsage = this.workerNodes[workerNodeKey].usage
903 ++workerUsage.tasks.executing
904 this.updateWaitTimeWorkerUsage(workerUsage, task)
905 }
906 if (
907 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
908 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
909 task.name as string
910 ) != null
911 ) {
db0e38ee 912 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 913 workerNodeKey
db0e38ee 914 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
915 ++taskFunctionWorkerUsage.tasks.executing
916 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 917 }
c97c7edb
S
918 }
919
c01733f1 920 /**
2e81254d 921 * Hook executed after the worker task execution.
bf9549ae 922 * Can be overridden.
c01733f1 923 *
501aea93 924 * @param workerNodeKey - The worker node key.
38e795c1 925 * @param message - The received message.
c01733f1 926 */
2e81254d 927 protected afterTaskExecutionHook (
501aea93 928 workerNodeKey: number,
2740a743 929 message: MessageValue<Response>
bf9549ae 930 ): void {
94407def
JB
931 if (this.workerNodes[workerNodeKey]?.usage != null) {
932 const workerUsage = this.workerNodes[workerNodeKey].usage
933 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
934 this.updateRunTimeWorkerUsage(workerUsage, message)
935 this.updateEluWorkerUsage(workerUsage, message)
936 }
937 if (
938 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
939 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 940 message.taskPerformance?.name as string
94407def
JB
941 ) != null
942 ) {
db0e38ee 943 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 944 workerNodeKey
db0e38ee 945 ].getTaskFunctionWorkerUsage(
0628755c 946 message.taskPerformance?.name as string
b558f6b5 947 ) as WorkerUsage
db0e38ee
JB
948 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
949 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
950 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
951 }
952 }
953
db0e38ee
JB
954 /**
955 * Whether the worker node shall update its task function worker usage or not.
956 *
957 * @param workerNodeKey - The worker node key.
958 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
959 */
960 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 961 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 962 return (
94407def 963 workerInfo != null &&
a5d15204 964 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 965 workerInfo.taskFunctions.length > 2
b558f6b5 966 )
f1c06930
JB
967 }
968
969 private updateTaskStatisticsWorkerUsage (
970 workerUsage: WorkerUsage,
971 message: MessageValue<Response>
972 ): void {
a4e07f72 973 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
974 if (
975 workerTaskStatistics.executing != null &&
976 workerTaskStatistics.executing > 0
977 ) {
978 --workerTaskStatistics.executing
5bb5be17 979 }
98e72cda
JB
980 if (message.taskError == null) {
981 ++workerTaskStatistics.executed
982 } else {
a4e07f72 983 ++workerTaskStatistics.failed
2740a743 984 }
f8eb0a2a
JB
985 }
986
a4e07f72
JB
987 private updateRunTimeWorkerUsage (
988 workerUsage: WorkerUsage,
f8eb0a2a
JB
989 message: MessageValue<Response>
990 ): void {
dc021bcc
JB
991 if (message.taskError != null) {
992 return
993 }
e4f20deb
JB
994 updateMeasurementStatistics(
995 workerUsage.runTime,
996 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 997 message.taskPerformance?.runTime ?? 0
e4f20deb 998 )
f8eb0a2a
JB
999 }
1000
a4e07f72
JB
1001 private updateWaitTimeWorkerUsage (
1002 workerUsage: WorkerUsage,
1c6fe997 1003 task: Task<Data>
f8eb0a2a 1004 ): void {
1c6fe997
JB
1005 const timestamp = performance.now()
1006 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1007 updateMeasurementStatistics(
1008 workerUsage.waitTime,
1009 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1010 taskWaitTime
e4f20deb 1011 )
c01733f1 1012 }
1013
a4e07f72 1014 private updateEluWorkerUsage (
5df69fab 1015 workerUsage: WorkerUsage,
62c15a68
JB
1016 message: MessageValue<Response>
1017 ): void {
dc021bcc
JB
1018 if (message.taskError != null) {
1019 return
1020 }
008512c7
JB
1021 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1022 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1023 updateMeasurementStatistics(
1024 workerUsage.elu.active,
008512c7 1025 eluTaskStatisticsRequirements,
dc021bcc 1026 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1027 )
1028 updateMeasurementStatistics(
1029 workerUsage.elu.idle,
008512c7 1030 eluTaskStatisticsRequirements,
dc021bcc 1031 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1032 )
008512c7 1033 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1034 if (message.taskPerformance?.elu != null) {
f7510105
JB
1035 if (workerUsage.elu.utilization != null) {
1036 workerUsage.elu.utilization =
1037 (workerUsage.elu.utilization +
1038 message.taskPerformance.elu.utilization) /
1039 2
1040 } else {
1041 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1042 }
62c15a68
JB
1043 }
1044 }
1045 }
1046
280c2a77 1047 /**
f06e48d8 1048 * Chooses a worker node for the next task.
280c2a77 1049 *
6c6afb84 1050 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1051 *
aa9eede8 1052 * @returns The chosen worker node key
280c2a77 1053 */
6c6afb84 1054 private chooseWorkerNode (): number {
930dcf12 1055 if (this.shallCreateDynamicWorker()) {
aa9eede8 1056 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1057 if (
b1aae695 1058 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1059 ) {
aa9eede8 1060 return workerNodeKey
6c6afb84 1061 }
17393ac8 1062 }
930dcf12
JB
1063 return this.workerChoiceStrategyContext.execute()
1064 }
1065
6c6afb84
JB
1066 /**
1067 * Conditions for dynamic worker creation.
1068 *
1069 * @returns Whether to create a dynamic worker or not.
1070 */
1071 private shallCreateDynamicWorker (): boolean {
930dcf12 1072 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1073 }
1074
280c2a77 1075 /**
aa9eede8 1076 * Sends a message to worker given its worker node key.
280c2a77 1077 *
aa9eede8 1078 * @param workerNodeKey - The worker node key.
38e795c1 1079 * @param message - The message.
7d91a8cd 1080 * @param transferList - The optional array of transferable objects.
280c2a77
S
1081 */
1082 protected abstract sendToWorker (
aa9eede8 1083 workerNodeKey: number,
7d91a8cd
JB
1084 message: MessageValue<Data>,
1085 transferList?: TransferListItem[]
280c2a77
S
1086 ): void
1087
729c563d 1088 /**
41344292 1089 * Creates a new worker.
6c6afb84
JB
1090 *
1091 * @returns Newly created worker.
729c563d 1092 */
280c2a77 1093 protected abstract createWorker (): Worker
c97c7edb 1094
4a6952ff 1095 /**
aa9eede8 1096 * Creates a new, completely set up worker node.
4a6952ff 1097 *
aa9eede8 1098 * @returns New, completely set up worker node key.
4a6952ff 1099 */
aa9eede8 1100 protected createAndSetupWorkerNode (): number {
bdacc2d2 1101 const worker = this.createWorker()
280c2a77 1102
fd04474e 1103 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1104 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1105 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1106 worker.on('error', error => {
aad6fb64 1107 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1108 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1109 workerInfo.ready = false
0dc838e3 1110 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1111 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1112 if (
1113 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1114 this.started &&
1115 !this.starting
15b176e0 1116 ) {
9b106837 1117 if (workerInfo.dynamic) {
aa9eede8 1118 this.createAndSetupDynamicWorkerNode()
8a1260a3 1119 } else {
aa9eede8 1120 this.createAndSetupWorkerNode()
8a1260a3 1121 }
5baee0d7 1122 }
19dbc45b 1123 if (this.opts.enableTasksQueue === true) {
9b106837 1124 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1125 }
5baee0d7 1126 })
a35560ba 1127 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1128 worker.once('exit', () => {
f06e48d8 1129 this.removeWorkerNode(worker)
a974afa6 1130 })
280c2a77 1131
aa9eede8 1132 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1133
aa9eede8 1134 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1135
aa9eede8 1136 return workerNodeKey
c97c7edb 1137 }
be0676b3 1138
930dcf12 1139 /**
aa9eede8 1140 * Creates a new, completely set up dynamic worker node.
930dcf12 1141 *
aa9eede8 1142 * @returns New, completely set up dynamic worker node key.
930dcf12 1143 */
aa9eede8
JB
1144 protected createAndSetupDynamicWorkerNode (): number {
1145 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1146 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1147 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1148 message.workerId
aad6fb64 1149 )
aa9eede8 1150 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1151 // Kill message received from worker
930dcf12
JB
1152 if (
1153 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1154 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1155 ((this.opts.enableTasksQueue === false &&
aa9eede8 1156 workerUsage.tasks.executing === 0) ||
7b56f532 1157 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1158 workerUsage.tasks.executing === 0 &&
1159 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1160 ) {
041dc05b 1161 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1162 this.emitter?.emit(PoolEvents.error, error)
1163 })
930dcf12
JB
1164 }
1165 })
46b0bb09 1166 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1167 this.sendToWorker(workerNodeKey, {
b0a4db63 1168 checkActive: true,
21f710aa
JB
1169 workerId: workerInfo.id as number
1170 })
b5e113f6 1171 workerInfo.dynamic = true
b1aae695
JB
1172 if (
1173 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1174 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1175 ) {
b5e113f6
JB
1176 workerInfo.ready = true
1177 }
33e6bb4c 1178 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1179 return workerNodeKey
930dcf12
JB
1180 }
1181
a2ed5053 1182 /**
aa9eede8 1183 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1184 *
aa9eede8 1185 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1186 * @param listener - The message listener callback.
1187 */
85aeb3f3
JB
1188 protected abstract registerWorkerMessageListener<
1189 Message extends Data | Response
aa9eede8
JB
1190 >(
1191 workerNodeKey: number,
1192 listener: (message: MessageValue<Message>) => void
1193 ): void
a2ed5053
JB
1194
1195 /**
aa9eede8 1196 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1197 * Can be overridden.
1198 *
aa9eede8 1199 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1200 */
aa9eede8 1201 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1202 // Listen to worker messages.
aa9eede8 1203 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1204 // Send the startup message to worker.
aa9eede8 1205 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1206 // Send the statistics message to worker.
1207 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1208 if (this.opts.enableTasksQueue === true) {
dbd73092 1209 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1210 this.workerNodes[workerNodeKey].onEmptyQueue =
1211 this.taskStealingOnEmptyQueue.bind(this)
1212 }
1213 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1214 this.workerNodes[workerNodeKey].onBackPressure =
1215 this.tasksStealingOnBackPressure.bind(this)
1216 }
72695f86 1217 }
d2c73f82
JB
1218 }
1219
85aeb3f3 1220 /**
aa9eede8
JB
1221 * Sends the startup message to worker given its worker node key.
1222 *
1223 * @param workerNodeKey - The worker node key.
1224 */
1225 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1226
1227 /**
9edb9717 1228 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1229 *
aa9eede8 1230 * @param workerNodeKey - The worker node key.
85aeb3f3 1231 */
9edb9717 1232 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1233 this.sendToWorker(workerNodeKey, {
1234 statistics: {
1235 runTime:
1236 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1237 .runTime.aggregate,
1238 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1239 .elu.aggregate
1240 },
46b0bb09 1241 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1242 })
1243 }
a2ed5053
JB
1244
1245 private redistributeQueuedTasks (workerNodeKey: number): void {
1246 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1247 const destinationWorkerNodeKey = this.workerNodes.reduce(
1248 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1249 return workerNode.info.ready &&
1250 workerNode.usage.tasks.queued <
1251 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1252 ? workerNodeKey
1253 : minWorkerNodeKey
1254 },
1255 0
1256 )
3f690f25
JB
1257 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1258 const task = {
1259 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1260 workerId: destinationWorkerNode.info.id as number
1261 }
1262 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1263 this.executeTask(destinationWorkerNodeKey, task)
1264 } else {
1265 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1266 }
1267 }
1268 }
1269
b1838604
JB
1270 private updateTaskStolenStatisticsWorkerUsage (
1271 workerNodeKey: number,
b1838604
JB
1272 taskName: string
1273 ): void {
1a880eca 1274 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1275 if (workerNode?.usage != null) {
1276 ++workerNode.usage.tasks.stolen
1277 }
1278 if (
1279 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1280 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1281 ) {
1282 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1283 taskName
1284 ) as WorkerUsage
1285 ++taskFunctionWorkerUsage.tasks.stolen
1286 }
1287 }
1288
dd951876 1289 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1290 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1291 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1292 const workerNodes = this.workerNodes
a6b3272b 1293 .slice()
dd951876
JB
1294 .sort(
1295 (workerNodeA, workerNodeB) =>
1296 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1297 )
f201a0cd 1298 const sourceWorkerNode = workerNodes.find(
041dc05b 1299 workerNode =>
f201a0cd
JB
1300 workerNode.info.ready &&
1301 workerNode.info.id !== workerId &&
1302 workerNode.usage.tasks.queued > 0
1303 )
1304 if (sourceWorkerNode != null) {
1305 const task = {
1306 ...(sourceWorkerNode.popTask() as Task<Data>),
1307 workerId: destinationWorkerNode.info.id as number
0bc68267 1308 }
f201a0cd
JB
1309 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1310 this.executeTask(destinationWorkerNodeKey, task)
1311 } else {
1312 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1313 }
f201a0cd
JB
1314 this.updateTaskStolenStatisticsWorkerUsage(
1315 destinationWorkerNodeKey,
1316 task.name as string
1317 )
72695f86
JB
1318 }
1319 }
1320
1321 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1322 const sizeOffset = 1
1323 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1324 return
1325 }
72695f86
JB
1326 const sourceWorkerNode =
1327 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1328 const workerNodes = this.workerNodes
a6b3272b 1329 .slice()
72695f86
JB
1330 .sort(
1331 (workerNodeA, workerNodeB) =>
1332 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1333 )
1334 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1335 if (
0bc68267 1336 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1337 workerNode.info.ready &&
1338 workerNode.info.id !== workerId &&
0bc68267 1339 workerNode.usage.tasks.queued <
f778c355 1340 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1341 ) {
dd951876
JB
1342 const task = {
1343 ...(sourceWorkerNode.popTask() as Task<Data>),
1344 workerId: workerNode.info.id as number
1345 }
375f7504 1346 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1347 this.executeTask(workerNodeKey, task)
4de3d785 1348 } else {
dd951876 1349 this.enqueueTask(workerNodeKey, task)
4de3d785 1350 }
b1838604
JB
1351 this.updateTaskStolenStatisticsWorkerUsage(
1352 workerNodeKey,
b1838604
JB
1353 task.name as string
1354 )
10ecf8fd 1355 }
a2ed5053
JB
1356 }
1357 }
1358
be0676b3 1359 /**
aa9eede8 1360 * This method is the listener registered for each worker message.
be0676b3 1361 *
bdacc2d2 1362 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1363 */
1364 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1365 return message => {
21f710aa 1366 this.checkMessageWorkerId(message)
a5d15204 1367 if (message.ready != null && message.taskFunctions != null) {
81c02522 1368 // Worker ready response received from worker
10e2aa7e 1369 this.handleWorkerReadyResponse(message)
7629bdf1 1370 } else if (message.taskId != null) {
81c02522 1371 // Task execution response received from worker
6b272951 1372 this.handleTaskExecutionResponse(message)
90d7d101
JB
1373 } else if (message.taskFunctions != null) {
1374 // Task functions message received from worker
46b0bb09
JB
1375 this.getWorkerInfo(
1376 this.getWorkerNodeKeyByWorkerId(message.workerId)
b558f6b5 1377 ).taskFunctions = message.taskFunctions
6b272951
JB
1378 }
1379 }
1380 }
1381
10e2aa7e 1382 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1383 if (message.ready === false) {
1384 throw new Error(`Worker ${message.workerId} failed to initialize`)
1385 }
a5d15204 1386 const workerInfo = this.getWorkerInfo(
aad6fb64 1387 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1388 )
a5d15204
JB
1389 workerInfo.ready = message.ready as boolean
1390 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1391 if (this.emitter != null && this.ready) {
1392 this.emitter.emit(PoolEvents.ready, this.info)
1393 }
6b272951
JB
1394 }
1395
1396 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1397 const { taskId, taskError, data } = message
1398 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1399 if (promiseResponse != null) {
5441aea6
JB
1400 if (taskError != null) {
1401 this.emitter?.emit(PoolEvents.taskError, taskError)
1402 promiseResponse.reject(taskError.message)
6b272951 1403 } else {
5441aea6 1404 promiseResponse.resolve(data as Response)
6b272951 1405 }
501aea93
JB
1406 const workerNodeKey = promiseResponse.workerNodeKey
1407 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1408 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1409 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1410 if (
1411 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1412 this.tasksQueueSize(workerNodeKey) > 0 &&
1413 this.workerNodes[workerNodeKey].usage.tasks.executing <
1414 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1415 ) {
1416 this.executeTask(
1417 workerNodeKey,
1418 this.dequeueTask(workerNodeKey) as Task<Data>
1419 )
be0676b3
APA
1420 }
1421 }
be0676b3 1422 }
7c0ba920 1423
a1763c54 1424 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1425 if (this.busy) {
1426 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1427 }
1428 }
1429
1430 private checkAndEmitTaskQueuingEvents (): void {
1431 if (this.hasBackPressure()) {
1432 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1433 }
1434 }
1435
33e6bb4c
JB
1436 private checkAndEmitDynamicWorkerCreationEvents (): void {
1437 if (this.type === PoolTypes.dynamic) {
1438 if (this.full) {
1439 this.emitter?.emit(PoolEvents.full, this.info)
1440 }
1441 }
1442 }
1443
8a1260a3 1444 /**
aa9eede8 1445 * Gets the worker information given its worker node key.
8a1260a3
JB
1446 *
1447 * @param workerNodeKey - The worker node key.
3f09ed9f 1448 * @returns The worker information.
8a1260a3 1449 */
46b0bb09
JB
1450 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1451 return this.workerNodes[workerNodeKey].info
e221309a
JB
1452 }
1453
a05c10de 1454 /**
b0a4db63 1455 * Adds the given worker in the pool worker nodes.
ea7a90d3 1456 *
38e795c1 1457 * @param worker - The worker.
aa9eede8
JB
1458 * @returns The added worker node key.
1459 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1460 */
b0a4db63 1461 private addWorkerNode (worker: Worker): number {
671d5154
JB
1462 const workerNode = new WorkerNode<Worker, Data>(
1463 worker,
ff3f866a 1464 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1465 )
b97d82d8 1466 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1467 if (this.starting) {
1468 workerNode.info.ready = true
1469 }
aa9eede8 1470 this.workerNodes.push(workerNode)
aad6fb64 1471 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1472 if (workerNodeKey === -1) {
86ed0598 1473 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1474 }
1475 return workerNodeKey
ea7a90d3 1476 }
c923ce56 1477
51fe3d3c 1478 /**
f06e48d8 1479 * Removes the given worker from the pool worker nodes.
51fe3d3c 1480 *
f06e48d8 1481 * @param worker - The worker.
51fe3d3c 1482 */
416fd65c 1483 private removeWorkerNode (worker: Worker): void {
aad6fb64 1484 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1485 if (workerNodeKey !== -1) {
1486 this.workerNodes.splice(workerNodeKey, 1)
1487 this.workerChoiceStrategyContext.remove(workerNodeKey)
1488 }
51fe3d3c 1489 }
adc3c320 1490
e2b31e32
JB
1491 /** @inheritDoc */
1492 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1493 return (
e2b31e32
JB
1494 this.opts.enableTasksQueue === true &&
1495 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1496 )
1497 }
1498
1499 private hasBackPressure (): boolean {
1500 return (
1501 this.opts.enableTasksQueue === true &&
1502 this.workerNodes.findIndex(
041dc05b 1503 workerNode => !workerNode.hasBackPressure()
a1763c54 1504 ) === -1
9e844245 1505 )
e2b31e32
JB
1506 }
1507
b0a4db63 1508 /**
aa9eede8 1509 * Executes the given task on the worker given its worker node key.
b0a4db63 1510 *
aa9eede8 1511 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1512 * @param task - The task to execute.
1513 */
2e81254d 1514 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1515 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1516 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1517 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1518 }
1519
f9f00b5f 1520 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1521 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1522 this.checkAndEmitTaskQueuingEvents()
1523 return tasksQueueSize
adc3c320
JB
1524 }
1525
416fd65c 1526 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1527 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1528 }
1529
416fd65c 1530 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1531 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1532 }
1533
81c02522 1534 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1535 while (this.tasksQueueSize(workerNodeKey) > 0) {
1536 this.executeTask(
1537 workerNodeKey,
1538 this.dequeueTask(workerNodeKey) as Task<Data>
1539 )
ff733df7 1540 }
4b628b48 1541 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1542 }
1543
ef41a6e6
JB
1544 private flushTasksQueues (): void {
1545 for (const [workerNodeKey] of this.workerNodes.entries()) {
1546 this.flushTasksQueue(workerNodeKey)
1547 }
1548 }
c97c7edb 1549}