Merge branch 'master' of github.com:poolifier/poolifier into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
419e8121 72 * The task execution response promise map:
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
72ae84a2
JB
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
15b176e0
JB
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
47352846
JB
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
afe0d5bf
JB
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
729c563d
S
115 /**
116 * Constructs a new poolifier pool.
117 *
38e795c1 118 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 119 * @param filePath - Path to the worker file.
38e795c1 120 * @param opts - Options for the pool.
729c563d 121 */
c97c7edb 122 public constructor (
b4213b7f
JB
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
c97c7edb 126 ) {
78cea37e 127 if (!this.isMain()) {
04f45163 128 throw new Error(
8c6d4acf 129 'Cannot start a pool from a worker with the same type as the pool'
04f45163 130 )
c97c7edb 131 }
8d3782fa 132 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 133 this.checkFilePath(this.filePath)
7c0ba920 134 this.checkPoolOptions(this.opts)
1086026a 135
7254e419
JB
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 139
6bd72cd0 140 if (this.opts.enableEvents === true) {
7c0ba920
JB
141 this.emitter = new PoolEmitter()
142 }
d59df138
JB
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
da309861
JB
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
b6b32453
JB
152
153 this.setupHook()
154
72ae84a2
JB
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
47352846 157 this.started = false
075e51d1 158 this.starting = false
47352846
JB
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
afe0d5bf
JB
162
163 this.startTimestamp = performance.now()
c97c7edb
S
164 }
165
a35560ba 166 private checkFilePath (filePath: string): void {
ffcbbad8
JB
167 if (
168 filePath == null ||
3d6dd312 169 typeof filePath !== 'string' ||
ffcbbad8
JB
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
c510fea7
APA
172 throw new Error('Please specify a file with a worker implementation')
173 }
3d6dd312
JB
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
c510fea7
APA
177 }
178
8d3782fa
JB
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
78cea37e 184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 185 throw new TypeError(
0d80593b 186 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
187 )
188 } else if (numberOfWorkers < 0) {
473c717a 189 throw new RangeError(
8d3782fa
JB
190 'Cannot instantiate a pool with a negative number of workers'
191 )
6b27d407 192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 198 if (this.type === PoolTypes.dynamic) {
a5ed75b7 199 if (max == null) {
e695d66f 200 throw new TypeError(
a5ed75b7
JB
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
079de991
JB
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
b97d82d8 211 } else if (max === 0) {
079de991 212 throw new RangeError(
d640b48b 213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
8d3782fa
JB
220 }
221 }
222
7c0ba920 223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 224 if (isPlainObject(opts)) {
47352846 225 this.opts.startWorkers = opts.startWorkers ?? true
0d80593b
JB
226 this.opts.workerChoiceStrategy =
227 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
228 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
229 this.opts.workerChoiceStrategyOptions = {
230 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
231 ...opts.workerChoiceStrategyOptions
232 }
49be33fe
JB
233 this.checkValidWorkerChoiceStrategyOptions(
234 this.opts.workerChoiceStrategyOptions
235 )
1f68cede 236 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
237 this.opts.enableEvents = opts.enableEvents ?? true
238 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
239 if (this.opts.enableTasksQueue) {
240 this.checkValidTasksQueueOptions(
241 opts.tasksQueueOptions as TasksQueueOptions
242 )
243 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
244 opts.tasksQueueOptions as TasksQueueOptions
245 )
246 }
247 } else {
248 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 249 }
aee46736
JB
250 }
251
252 private checkValidWorkerChoiceStrategy (
253 workerChoiceStrategy: WorkerChoiceStrategy
254 ): void {
255 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 256 throw new Error(
aee46736 257 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
258 )
259 }
7c0ba920
JB
260 }
261
0d80593b
JB
262 private checkValidWorkerChoiceStrategyOptions (
263 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
264 ): void {
265 if (!isPlainObject(workerChoiceStrategyOptions)) {
266 throw new TypeError(
267 'Invalid worker choice strategy options: must be a plain object'
268 )
269 }
8990357d 270 if (
8c0b113f
JB
271 workerChoiceStrategyOptions.retries != null &&
272 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
273 ) {
274 throw new TypeError(
8c0b113f 275 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
276 )
277 }
278 if (
8c0b113f
JB
279 workerChoiceStrategyOptions.retries != null &&
280 workerChoiceStrategyOptions.retries < 0
8990357d
JB
281 ) {
282 throw new RangeError(
8c0b113f 283 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
284 )
285 }
49be33fe
JB
286 if (
287 workerChoiceStrategyOptions.weights != null &&
6b27d407 288 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
289 ) {
290 throw new Error(
291 'Invalid worker choice strategy options: must have a weight for each worker node'
292 )
293 }
f0d7f803
JB
294 if (
295 workerChoiceStrategyOptions.measurement != null &&
296 !Object.values(Measurements).includes(
297 workerChoiceStrategyOptions.measurement
298 )
299 ) {
300 throw new Error(
301 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
302 )
303 }
0d80593b
JB
304 }
305
a20f0ba5 306 private checkValidTasksQueueOptions (
76d91ea0 307 tasksQueueOptions: TasksQueueOptions
a20f0ba5 308 ): void {
0d80593b
JB
309 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
310 throw new TypeError('Invalid tasks queue options: must be a plain object')
311 }
f0d7f803 312 if (
b7d085c4
JB
313 tasksQueueOptions?.concurrency != null &&
314 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
315 ) {
316 throw new TypeError(
20c6f652 317 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
318 )
319 }
320 if (
b7d085c4
JB
321 tasksQueueOptions?.concurrency != null &&
322 tasksQueueOptions?.concurrency <= 0
f0d7f803 323 ) {
e695d66f 324 throw new RangeError(
b7d085c4 325 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
326 )
327 }
20c6f652 328 if (
b7d085c4
JB
329 tasksQueueOptions?.size != null &&
330 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 331 ) {
ff3f866a 332 throw new TypeError(
68dbcdc0 333 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
334 )
335 }
b7d085c4 336 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 337 throw new RangeError(
b7d085c4 338 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
339 )
340 }
341 }
342
08f3f44c 343 /** @inheritDoc */
6b27d407
JB
344 public get info (): PoolInfo {
345 return {
23ccf9d7 346 version,
6b27d407 347 type: this.type,
184855e6 348 worker: this.worker,
47352846 349 started: this.started,
2431bdb4
JB
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
352 minSize: this.minSize,
353 maxSize: this.maxSize,
c05f0d50
JB
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
1305e9a8
JB
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
f59e1027 361 workerNode.usage.tasks.executing === 0
a4e07f72
JB
362 ? accumulator + 1
363 : accumulator,
6b27d407
JB
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
f59e1027 368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
369 0
370 ),
a4e07f72 371 executedTasks: this.workerNodes.reduce(
6b27d407 372 (accumulator, workerNode) =>
f59e1027 373 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
f59e1027 378 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
379 0
380 ),
daf86646
JB
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
a1763c54
JB
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
68cbdc84
JB
398 ...(this.opts.enableTasksQueue === true && {
399 stolenTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + workerNode.usage.tasks.stolen,
402 0
403 )
404 }),
a4e07f72
JB
405 failedTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
f59e1027 407 accumulator + workerNode.usage.tasks.failed,
a4e07f72 408 0
1dcf8b7b
JB
409 ),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .runTime.aggregate && {
412 runTime: {
98e72cda 413 minimum: round(
90d6701c 414 min(
98e72cda 415 ...this.workerNodes.map(
041dc05b 416 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 417 )
1dcf8b7b
JB
418 )
419 ),
98e72cda 420 maximum: round(
90d6701c 421 max(
98e72cda 422 ...this.workerNodes.map(
041dc05b 423 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 424 )
1dcf8b7b 425 )
98e72cda 426 ),
3baa0837
JB
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .runTime.average && {
429 average: round(
430 average(
431 this.workerNodes.reduce<number[]>(
432 (accumulator, workerNode) =>
433 accumulator.concat(workerNode.usage.runTime.history),
434 []
435 )
98e72cda 436 )
dc021bcc 437 )
3baa0837 438 }),
98e72cda
JB
439 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 .runTime.median && {
441 median: round(
442 median(
3baa0837
JB
443 this.workerNodes.reduce<number[]>(
444 (accumulator, workerNode) =>
445 accumulator.concat(workerNode.usage.runTime.history),
446 []
98e72cda
JB
447 )
448 )
449 )
450 })
1dcf8b7b
JB
451 }
452 }),
453 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
454 .waitTime.aggregate && {
455 waitTime: {
98e72cda 456 minimum: round(
90d6701c 457 min(
98e72cda 458 ...this.workerNodes.map(
041dc05b 459 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 460 )
1dcf8b7b
JB
461 )
462 ),
98e72cda 463 maximum: round(
90d6701c 464 max(
98e72cda 465 ...this.workerNodes.map(
041dc05b 466 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 467 )
1dcf8b7b 468 )
98e72cda 469 ),
3baa0837
JB
470 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
471 .waitTime.average && {
472 average: round(
473 average(
474 this.workerNodes.reduce<number[]>(
475 (accumulator, workerNode) =>
476 accumulator.concat(workerNode.usage.waitTime.history),
477 []
478 )
98e72cda 479 )
dc021bcc 480 )
3baa0837 481 }),
98e72cda
JB
482 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
483 .waitTime.median && {
484 median: round(
485 median(
3baa0837
JB
486 this.workerNodes.reduce<number[]>(
487 (accumulator, workerNode) =>
488 accumulator.concat(workerNode.usage.waitTime.history),
489 []
98e72cda
JB
490 )
491 )
492 )
493 })
1dcf8b7b
JB
494 }
495 })
6b27d407
JB
496 }
497 }
08f3f44c 498
aa9eede8
JB
499 /**
500 * The pool readiness boolean status.
501 */
2431bdb4
JB
502 private get ready (): boolean {
503 return (
b97d82d8
JB
504 this.workerNodes.reduce(
505 (accumulator, workerNode) =>
506 !workerNode.info.dynamic && workerNode.info.ready
507 ? accumulator + 1
508 : accumulator,
509 0
510 ) >= this.minSize
2431bdb4
JB
511 )
512 }
513
afe0d5bf 514 /**
aa9eede8 515 * The approximate pool utilization.
afe0d5bf
JB
516 *
517 * @returns The pool utilization.
518 */
519 private get utilization (): number {
8e5ca040 520 const poolTimeCapacity =
fe7d90db 521 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
522 const totalTasksRunTime = this.workerNodes.reduce(
523 (accumulator, workerNode) =>
71514351 524 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
525 0
526 )
527 const totalTasksWaitTime = this.workerNodes.reduce(
528 (accumulator, workerNode) =>
71514351 529 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
530 0
531 )
8e5ca040 532 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
533 }
534
8881ae32 535 /**
aa9eede8 536 * The pool type.
8881ae32
JB
537 *
538 * If it is `'dynamic'`, it provides the `max` property.
539 */
540 protected abstract get type (): PoolType
541
184855e6 542 /**
aa9eede8 543 * The worker type.
184855e6
JB
544 */
545 protected abstract get worker (): WorkerType
546
c2ade475 547 /**
aa9eede8 548 * The pool minimum size.
c2ade475 549 */
8735b4e5
JB
550 protected get minSize (): number {
551 return this.numberOfWorkers
552 }
ff733df7
JB
553
554 /**
aa9eede8 555 * The pool maximum size.
ff733df7 556 */
8735b4e5
JB
557 protected get maxSize (): number {
558 return this.max ?? this.numberOfWorkers
559 }
a35560ba 560
6b813701
JB
561 /**
562 * Checks if the worker id sent in the received message from a worker is valid.
563 *
564 * @param message - The received message.
565 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
566 */
21f710aa 567 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
568 if (message.workerId == null) {
569 throw new Error('Worker message received without worker id')
570 } else if (
21f710aa 571 message.workerId != null &&
aad6fb64 572 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
573 ) {
574 throw new Error(
575 `Worker message received from unknown worker '${message.workerId}'`
576 )
577 }
578 }
579
ffcbbad8 580 /**
f06e48d8 581 * Gets the given worker its worker node key.
ffcbbad8
JB
582 *
583 * @param worker - The worker.
f59e1027 584 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 585 */
aad6fb64 586 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 587 return this.workerNodes.findIndex(
041dc05b 588 workerNode => workerNode.worker === worker
f06e48d8 589 )
bf9549ae
JB
590 }
591
aa9eede8
JB
592 /**
593 * Gets the worker node key given its worker id.
594 *
595 * @param workerId - The worker id.
aad6fb64 596 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 597 */
72ae84a2 598 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 599 return this.workerNodes.findIndex(
041dc05b 600 workerNode => workerNode.info.id === workerId
aad6fb64 601 )
aa9eede8
JB
602 }
603
afc003b2 604 /** @inheritDoc */
a35560ba 605 public setWorkerChoiceStrategy (
59219cbb
JB
606 workerChoiceStrategy: WorkerChoiceStrategy,
607 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 608 ): void {
aee46736 609 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 610 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
611 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
612 this.opts.workerChoiceStrategy
613 )
614 if (workerChoiceStrategyOptions != null) {
615 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
616 }
aa9eede8 617 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 618 workerNode.resetUsage()
9edb9717 619 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 620 }
a20f0ba5
JB
621 }
622
623 /** @inheritDoc */
624 public setWorkerChoiceStrategyOptions (
625 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
626 ): void {
0d80593b 627 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
628 this.opts.workerChoiceStrategyOptions = {
629 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
630 ...workerChoiceStrategyOptions
631 }
a20f0ba5
JB
632 this.workerChoiceStrategyContext.setOptions(
633 this.opts.workerChoiceStrategyOptions
a35560ba
S
634 )
635 }
636
a20f0ba5 637 /** @inheritDoc */
8f52842f
JB
638 public enableTasksQueue (
639 enable: boolean,
640 tasksQueueOptions?: TasksQueueOptions
641 ): void {
a20f0ba5 642 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 643 this.flushTasksQueues()
a20f0ba5
JB
644 }
645 this.opts.enableTasksQueue = enable
8f52842f 646 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
647 }
648
649 /** @inheritDoc */
8f52842f 650 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 651 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
652 this.checkValidTasksQueueOptions(tasksQueueOptions)
653 this.opts.tasksQueueOptions =
654 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 655 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 656 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
657 delete this.opts.tasksQueueOptions
658 }
659 }
660
5b49e864 661 private setTasksQueueSize (size: number): void {
20c6f652 662 for (const workerNode of this.workerNodes) {
ff3f866a 663 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
664 }
665 }
666
a20f0ba5
JB
667 private buildTasksQueueOptions (
668 tasksQueueOptions: TasksQueueOptions
669 ): TasksQueueOptions {
670 return {
20c6f652 671 ...{
ff3f866a 672 size: Math.pow(this.maxSize, 2),
47352846 673 concurrency: 1,
dbd73092 674 taskStealing: true,
47352846 675 tasksStealingOnBackPressure: true
20c6f652
JB
676 },
677 ...tasksQueueOptions
a20f0ba5
JB
678 }
679 }
680
c319c66b
JB
681 /**
682 * Whether the pool is full or not.
683 *
684 * The pool filling boolean status.
685 */
dea903a8
JB
686 protected get full (): boolean {
687 return this.workerNodes.length >= this.maxSize
688 }
c2ade475 689
c319c66b
JB
690 /**
691 * Whether the pool is busy or not.
692 *
693 * The pool busyness boolean status.
694 */
695 protected abstract get busy (): boolean
7c0ba920 696
6c6afb84 697 /**
3d76750a 698 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
699 *
700 * @returns Worker nodes busyness boolean status.
701 */
c2ade475 702 protected internalBusy (): boolean {
3d76750a
JB
703 if (this.opts.enableTasksQueue === true) {
704 return (
705 this.workerNodes.findIndex(
041dc05b 706 workerNode =>
3d76750a
JB
707 workerNode.info.ready &&
708 workerNode.usage.tasks.executing <
709 (this.opts.tasksQueueOptions?.concurrency as number)
710 ) === -1
711 )
3d76750a 712 }
419e8121
JB
713 return (
714 this.workerNodes.findIndex(
715 workerNode =>
716 workerNode.info.ready && workerNode.usage.tasks.executing === 0
717 ) === -1
718 )
cb70b19d
JB
719 }
720
e81c38f2 721 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
722 workerNodeKey: number,
723 message: MessageValue<Data>
724 ): Promise<boolean> {
725 const workerId = this.getWorkerInfo(workerNodeKey).id as number
726 return await new Promise<boolean>((resolve, reject) => {
727 this.registerWorkerMessageListener(workerNodeKey, message => {
728 if (
729 message.workerId === workerId &&
730 message.taskFunctionOperationStatus === true
731 ) {
732 resolve(true)
733 } else if (
734 message.workerId === workerId &&
735 message.taskFunctionOperationStatus === false
736 ) {
737 reject(
738 new Error(
739 `Task function operation ${
740 message.taskFunctionOperation as string
741 } failed on worker ${message.workerId}`
742 )
743 )
744 }
745 })
746 this.sendToWorker(workerNodeKey, message)
747 })
748 }
749
750 private async sendTaskFunctionOperationToWorkers (
e81c38f2
JB
751 message: Omit<MessageValue<Data>, 'workerId'>
752 ): Promise<boolean> {
753 return await new Promise<boolean>((resolve, reject) => {
754 const responsesReceived = new Array<MessageValue<Data | Response>>()
755 for (const [workerNodeKey] of this.workerNodes.entries()) {
756 this.registerWorkerMessageListener(workerNodeKey, message => {
757 if (message.taskFunctionOperationStatus != null) {
758 responsesReceived.push(message)
759 if (
760 responsesReceived.length === this.workerNodes.length &&
761 responsesReceived.every(
762 message => message.taskFunctionOperationStatus === true
763 )
764 ) {
765 resolve(true)
766 } else if (
767 responsesReceived.length === this.workerNodes.length &&
768 responsesReceived.some(
769 message => message.taskFunctionOperationStatus === false
770 )
771 ) {
772 reject(
773 new Error(
774 `Task function operation ${
775 message.taskFunctionOperation as string
72ae84a2 776 } failed on worker ${message.workerId as number}`
e81c38f2
JB
777 )
778 )
779 }
780 }
781 })
72ae84a2 782 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
783 }
784 })
6703b9f4
JB
785 }
786
787 /** @inheritDoc */
788 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
789 for (const workerNode of this.workerNodes) {
790 if (
791 Array.isArray(workerNode.info.taskFunctionNames) &&
792 workerNode.info.taskFunctionNames.includes(name)
793 ) {
794 return true
795 }
796 }
797 return false
6703b9f4
JB
798 }
799
800 /** @inheritDoc */
e81c38f2
JB
801 public async addTaskFunction (
802 name: string,
72ae84a2 803 taskFunction: TaskFunction<Data, Response>
e81c38f2 804 ): Promise<boolean> {
72ae84a2
JB
805 this.taskFunctions.set(name, taskFunction)
806 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
807 taskFunctionOperation: 'add',
808 taskFunctionName: name,
809 taskFunction: taskFunction.toString()
810 })
6703b9f4
JB
811 }
812
813 /** @inheritDoc */
e81c38f2 814 public async removeTaskFunction (name: string): Promise<boolean> {
72ae84a2
JB
815 this.taskFunctions.delete(name)
816 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
817 taskFunctionOperation: 'remove',
818 taskFunctionName: name
819 })
6703b9f4
JB
820 }
821
90d7d101 822 /** @inheritDoc */
6703b9f4 823 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
824 for (const workerNode of this.workerNodes) {
825 if (
6703b9f4
JB
826 Array.isArray(workerNode.info.taskFunctionNames) &&
827 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 828 ) {
6703b9f4 829 return workerNode.info.taskFunctionNames
f2dbbf95 830 }
90d7d101 831 }
f2dbbf95 832 return []
90d7d101
JB
833 }
834
6703b9f4 835 /** @inheritDoc */
e81c38f2 836 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 837 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
838 taskFunctionOperation: 'default',
839 taskFunctionName: name
840 })
6703b9f4
JB
841 }
842
375f7504
JB
843 private shallExecuteTask (workerNodeKey: number): boolean {
844 return (
845 this.tasksQueueSize(workerNodeKey) === 0 &&
846 this.workerNodes[workerNodeKey].usage.tasks.executing <
847 (this.opts.tasksQueueOptions?.concurrency as number)
848 )
849 }
850
afc003b2 851 /** @inheritDoc */
7d91a8cd
JB
852 public async execute (
853 data?: Data,
854 name?: string,
855 transferList?: TransferListItem[]
856 ): Promise<Response> {
52b71763 857 return await new Promise<Response>((resolve, reject) => {
15b176e0 858 if (!this.started) {
47352846 859 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 860 return
15b176e0 861 }
7d91a8cd
JB
862 if (name != null && typeof name !== 'string') {
863 reject(new TypeError('name argument must be a string'))
9d2d0da1 864 return
7d91a8cd 865 }
90d7d101
JB
866 if (
867 name != null &&
868 typeof name === 'string' &&
869 name.trim().length === 0
870 ) {
f58b60b9 871 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 872 return
90d7d101 873 }
b558f6b5
JB
874 if (transferList != null && !Array.isArray(transferList)) {
875 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 876 return
b558f6b5
JB
877 }
878 const timestamp = performance.now()
879 const workerNodeKey = this.chooseWorkerNode()
501aea93 880 const task: Task<Data> = {
52b71763
JB
881 name: name ?? DEFAULT_TASK_NAME,
882 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
883 data: data ?? ({} as Data),
7d91a8cd 884 transferList,
52b71763 885 timestamp,
7629bdf1 886 taskId: randomUUID()
52b71763 887 }
7629bdf1 888 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
889 resolve,
890 reject,
501aea93 891 workerNodeKey
2e81254d 892 })
52b71763 893 if (
4e377863
JB
894 this.opts.enableTasksQueue === false ||
895 (this.opts.enableTasksQueue === true &&
375f7504 896 this.shallExecuteTask(workerNodeKey))
52b71763 897 ) {
501aea93 898 this.executeTask(workerNodeKey, task)
4e377863
JB
899 } else {
900 this.enqueueTask(workerNodeKey, task)
52b71763 901 }
2e81254d 902 })
280c2a77 903 }
c97c7edb 904
47352846
JB
905 /** @inheritdoc */
906 public start (): void {
907 this.starting = true
908 while (
909 this.workerNodes.reduce(
910 (accumulator, workerNode) =>
911 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
912 0
913 ) < this.numberOfWorkers
914 ) {
915 this.createAndSetupWorkerNode()
916 }
917 this.starting = false
918 this.started = true
919 }
920
afc003b2 921 /** @inheritDoc */
c97c7edb 922 public async destroy (): Promise<void> {
1fbcaa7c 923 await Promise.all(
81c02522 924 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 925 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
926 })
927 )
33e6bb4c 928 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 929 this.started = false
c97c7edb
S
930 }
931
1e3214b6 932 protected async sendKillMessageToWorker (
72ae84a2 933 workerNodeKey: number
1e3214b6 934 ): Promise<void> {
9edb9717 935 await new Promise<void>((resolve, reject) => {
041dc05b 936 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
937 if (message.kill === 'success') {
938 resolve()
939 } else if (message.kill === 'failure') {
72ae84a2
JB
940 reject(
941 new Error(
942 `Worker ${
943 message.workerId as number
944 } kill message handling failed`
945 )
946 )
1e3214b6
JB
947 }
948 })
72ae84a2 949 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 950 })
1e3214b6
JB
951 }
952
4a6952ff 953 /**
aa9eede8 954 * Terminates the worker node given its worker node key.
4a6952ff 955 *
aa9eede8 956 * @param workerNodeKey - The worker node key.
4a6952ff 957 */
81c02522 958 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 959
729c563d 960 /**
6677a3d3
JB
961 * Setup hook to execute code before worker nodes are created in the abstract constructor.
962 * Can be overridden.
afc003b2
JB
963 *
964 * @virtual
729c563d 965 */
280c2a77 966 protected setupHook (): void {
965df41c 967 /* Intentionally empty */
280c2a77 968 }
c97c7edb 969
729c563d 970 /**
280c2a77
S
971 * Should return whether the worker is the main worker or not.
972 */
973 protected abstract isMain (): boolean
974
975 /**
2e81254d 976 * Hook executed before the worker task execution.
bf9549ae 977 * Can be overridden.
729c563d 978 *
f06e48d8 979 * @param workerNodeKey - The worker node key.
1c6fe997 980 * @param task - The task to execute.
729c563d 981 */
1c6fe997
JB
982 protected beforeTaskExecutionHook (
983 workerNodeKey: number,
984 task: Task<Data>
985 ): void {
94407def
JB
986 if (this.workerNodes[workerNodeKey]?.usage != null) {
987 const workerUsage = this.workerNodes[workerNodeKey].usage
988 ++workerUsage.tasks.executing
989 this.updateWaitTimeWorkerUsage(workerUsage, task)
990 }
991 if (
992 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
993 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
994 task.name as string
995 ) != null
996 ) {
db0e38ee 997 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 998 workerNodeKey
db0e38ee 999 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1000 ++taskFunctionWorkerUsage.tasks.executing
1001 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1002 }
c97c7edb
S
1003 }
1004
c01733f1 1005 /**
2e81254d 1006 * Hook executed after the worker task execution.
bf9549ae 1007 * Can be overridden.
c01733f1 1008 *
501aea93 1009 * @param workerNodeKey - The worker node key.
38e795c1 1010 * @param message - The received message.
c01733f1 1011 */
2e81254d 1012 protected afterTaskExecutionHook (
501aea93 1013 workerNodeKey: number,
2740a743 1014 message: MessageValue<Response>
bf9549ae 1015 ): void {
94407def
JB
1016 if (this.workerNodes[workerNodeKey]?.usage != null) {
1017 const workerUsage = this.workerNodes[workerNodeKey].usage
1018 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1019 this.updateRunTimeWorkerUsage(workerUsage, message)
1020 this.updateEluWorkerUsage(workerUsage, message)
1021 }
1022 if (
1023 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1024 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1025 message.taskPerformance?.name as string
94407def
JB
1026 ) != null
1027 ) {
db0e38ee 1028 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1029 workerNodeKey
db0e38ee 1030 ].getTaskFunctionWorkerUsage(
0628755c 1031 message.taskPerformance?.name as string
b558f6b5 1032 ) as WorkerUsage
db0e38ee
JB
1033 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1034 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1035 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1036 }
1037 }
1038
db0e38ee
JB
1039 /**
1040 * Whether the worker node shall update its task function worker usage or not.
1041 *
1042 * @param workerNodeKey - The worker node key.
1043 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1044 */
1045 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1046 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1047 return (
94407def 1048 workerInfo != null &&
6703b9f4
JB
1049 Array.isArray(workerInfo.taskFunctionNames) &&
1050 workerInfo.taskFunctionNames.length > 2
b558f6b5 1051 )
f1c06930
JB
1052 }
1053
1054 private updateTaskStatisticsWorkerUsage (
1055 workerUsage: WorkerUsage,
1056 message: MessageValue<Response>
1057 ): void {
a4e07f72 1058 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1059 if (
1060 workerTaskStatistics.executing != null &&
1061 workerTaskStatistics.executing > 0
1062 ) {
1063 --workerTaskStatistics.executing
5bb5be17 1064 }
6703b9f4 1065 if (message.workerError == null) {
98e72cda
JB
1066 ++workerTaskStatistics.executed
1067 } else {
a4e07f72 1068 ++workerTaskStatistics.failed
2740a743 1069 }
f8eb0a2a
JB
1070 }
1071
a4e07f72
JB
1072 private updateRunTimeWorkerUsage (
1073 workerUsage: WorkerUsage,
f8eb0a2a
JB
1074 message: MessageValue<Response>
1075 ): void {
6703b9f4 1076 if (message.workerError != null) {
dc021bcc
JB
1077 return
1078 }
e4f20deb
JB
1079 updateMeasurementStatistics(
1080 workerUsage.runTime,
1081 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1082 message.taskPerformance?.runTime ?? 0
e4f20deb 1083 )
f8eb0a2a
JB
1084 }
1085
a4e07f72
JB
1086 private updateWaitTimeWorkerUsage (
1087 workerUsage: WorkerUsage,
1c6fe997 1088 task: Task<Data>
f8eb0a2a 1089 ): void {
1c6fe997
JB
1090 const timestamp = performance.now()
1091 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1092 updateMeasurementStatistics(
1093 workerUsage.waitTime,
1094 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1095 taskWaitTime
e4f20deb 1096 )
c01733f1 1097 }
1098
a4e07f72 1099 private updateEluWorkerUsage (
5df69fab 1100 workerUsage: WorkerUsage,
62c15a68
JB
1101 message: MessageValue<Response>
1102 ): void {
6703b9f4 1103 if (message.workerError != null) {
dc021bcc
JB
1104 return
1105 }
008512c7
JB
1106 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1107 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1108 updateMeasurementStatistics(
1109 workerUsage.elu.active,
008512c7 1110 eluTaskStatisticsRequirements,
dc021bcc 1111 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1112 )
1113 updateMeasurementStatistics(
1114 workerUsage.elu.idle,
008512c7 1115 eluTaskStatisticsRequirements,
dc021bcc 1116 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1117 )
008512c7 1118 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1119 if (message.taskPerformance?.elu != null) {
f7510105
JB
1120 if (workerUsage.elu.utilization != null) {
1121 workerUsage.elu.utilization =
1122 (workerUsage.elu.utilization +
1123 message.taskPerformance.elu.utilization) /
1124 2
1125 } else {
1126 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1127 }
62c15a68
JB
1128 }
1129 }
1130 }
1131
280c2a77 1132 /**
f06e48d8 1133 * Chooses a worker node for the next task.
280c2a77 1134 *
6c6afb84 1135 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1136 *
aa9eede8 1137 * @returns The chosen worker node key
280c2a77 1138 */
6c6afb84 1139 private chooseWorkerNode (): number {
930dcf12 1140 if (this.shallCreateDynamicWorker()) {
aa9eede8 1141 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1142 if (
b1aae695 1143 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1144 ) {
aa9eede8 1145 return workerNodeKey
6c6afb84 1146 }
17393ac8 1147 }
930dcf12
JB
1148 return this.workerChoiceStrategyContext.execute()
1149 }
1150
6c6afb84
JB
1151 /**
1152 * Conditions for dynamic worker creation.
1153 *
1154 * @returns Whether to create a dynamic worker or not.
1155 */
1156 private shallCreateDynamicWorker (): boolean {
930dcf12 1157 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1158 }
1159
280c2a77 1160 /**
aa9eede8 1161 * Sends a message to worker given its worker node key.
280c2a77 1162 *
aa9eede8 1163 * @param workerNodeKey - The worker node key.
38e795c1 1164 * @param message - The message.
7d91a8cd 1165 * @param transferList - The optional array of transferable objects.
280c2a77
S
1166 */
1167 protected abstract sendToWorker (
aa9eede8 1168 workerNodeKey: number,
7d91a8cd
JB
1169 message: MessageValue<Data>,
1170 transferList?: TransferListItem[]
280c2a77
S
1171 ): void
1172
729c563d 1173 /**
41344292 1174 * Creates a new worker.
6c6afb84
JB
1175 *
1176 * @returns Newly created worker.
729c563d 1177 */
280c2a77 1178 protected abstract createWorker (): Worker
c97c7edb 1179
4a6952ff 1180 /**
aa9eede8 1181 * Creates a new, completely set up worker node.
4a6952ff 1182 *
aa9eede8 1183 * @returns New, completely set up worker node key.
4a6952ff 1184 */
aa9eede8 1185 protected createAndSetupWorkerNode (): number {
bdacc2d2 1186 const worker = this.createWorker()
280c2a77 1187
fd04474e 1188 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1189 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1190 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1191 worker.on('error', error => {
aad6fb64 1192 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1193 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1194 workerInfo.ready = false
0dc838e3 1195 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1196 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1197 if (
1198 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1199 this.started &&
1200 !this.starting
15b176e0 1201 ) {
9b106837 1202 if (workerInfo.dynamic) {
aa9eede8 1203 this.createAndSetupDynamicWorkerNode()
8a1260a3 1204 } else {
aa9eede8 1205 this.createAndSetupWorkerNode()
8a1260a3 1206 }
5baee0d7 1207 }
19dbc45b 1208 if (this.opts.enableTasksQueue === true) {
9b106837 1209 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1210 }
5baee0d7 1211 })
a35560ba 1212 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1213 worker.once('exit', () => {
f06e48d8 1214 this.removeWorkerNode(worker)
a974afa6 1215 })
280c2a77 1216
aa9eede8 1217 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1218
aa9eede8 1219 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1220
aa9eede8 1221 return workerNodeKey
c97c7edb 1222 }
be0676b3 1223
930dcf12 1224 /**
aa9eede8 1225 * Creates a new, completely set up dynamic worker node.
930dcf12 1226 *
aa9eede8 1227 * @returns New, completely set up dynamic worker node key.
930dcf12 1228 */
aa9eede8
JB
1229 protected createAndSetupDynamicWorkerNode (): number {
1230 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1231 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1232 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1233 message.workerId
aad6fb64 1234 )
aa9eede8 1235 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1236 // Kill message received from worker
930dcf12
JB
1237 if (
1238 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1239 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1240 ((this.opts.enableTasksQueue === false &&
aa9eede8 1241 workerUsage.tasks.executing === 0) ||
7b56f532 1242 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1243 workerUsage.tasks.executing === 0 &&
1244 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1245 ) {
041dc05b 1246 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1247 this.emitter?.emit(PoolEvents.error, error)
1248 })
930dcf12
JB
1249 }
1250 })
46b0bb09 1251 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1252 this.sendToWorker(workerNodeKey, {
e9dd5b66 1253 checkActive: true
21f710aa 1254 })
72ae84a2
JB
1255 if (this.taskFunctions.size > 0) {
1256 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1257 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1258 taskFunctionOperation: 'add',
1259 taskFunctionName,
1260 taskFunction: taskFunction.toString()
1261 }).catch(error => {
1262 this.emitter?.emit(PoolEvents.error, error)
1263 })
1264 }
1265 }
b5e113f6 1266 workerInfo.dynamic = true
b1aae695
JB
1267 if (
1268 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1269 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1270 ) {
b5e113f6
JB
1271 workerInfo.ready = true
1272 }
33e6bb4c 1273 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1274 return workerNodeKey
930dcf12
JB
1275 }
1276
a2ed5053 1277 /**
aa9eede8 1278 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1279 *
aa9eede8 1280 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1281 * @param listener - The message listener callback.
1282 */
85aeb3f3
JB
1283 protected abstract registerWorkerMessageListener<
1284 Message extends Data | Response
aa9eede8
JB
1285 >(
1286 workerNodeKey: number,
1287 listener: (message: MessageValue<Message>) => void
1288 ): void
a2ed5053
JB
1289
1290 /**
aa9eede8 1291 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1292 * Can be overridden.
1293 *
aa9eede8 1294 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1295 */
aa9eede8 1296 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1297 // Listen to worker messages.
aa9eede8 1298 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1299 // Send the startup message to worker.
aa9eede8 1300 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1301 // Send the statistics message to worker.
1302 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1303 if (this.opts.enableTasksQueue === true) {
dbd73092 1304 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1305 this.workerNodes[workerNodeKey].onEmptyQueue =
1306 this.taskStealingOnEmptyQueue.bind(this)
1307 }
1308 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1309 this.workerNodes[workerNodeKey].onBackPressure =
1310 this.tasksStealingOnBackPressure.bind(this)
1311 }
72695f86 1312 }
d2c73f82
JB
1313 }
1314
85aeb3f3 1315 /**
aa9eede8
JB
1316 * Sends the startup message to worker given its worker node key.
1317 *
1318 * @param workerNodeKey - The worker node key.
1319 */
1320 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1321
1322 /**
9edb9717 1323 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1324 *
aa9eede8 1325 * @param workerNodeKey - The worker node key.
85aeb3f3 1326 */
9edb9717 1327 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1328 this.sendToWorker(workerNodeKey, {
1329 statistics: {
1330 runTime:
1331 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1332 .runTime.aggregate,
1333 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1334 .elu.aggregate
72ae84a2 1335 }
aa9eede8
JB
1336 })
1337 }
a2ed5053
JB
1338
1339 private redistributeQueuedTasks (workerNodeKey: number): void {
1340 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1341 const destinationWorkerNodeKey = this.workerNodes.reduce(
1342 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1343 return workerNode.info.ready &&
1344 workerNode.usage.tasks.queued <
1345 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1346 ? workerNodeKey
1347 : minWorkerNodeKey
1348 },
1349 0
1350 )
72ae84a2 1351 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1352 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1353 this.executeTask(destinationWorkerNodeKey, task)
1354 } else {
1355 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1356 }
1357 }
1358 }
1359
b1838604
JB
1360 private updateTaskStolenStatisticsWorkerUsage (
1361 workerNodeKey: number,
b1838604
JB
1362 taskName: string
1363 ): void {
1a880eca 1364 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1365 if (workerNode?.usage != null) {
1366 ++workerNode.usage.tasks.stolen
1367 }
1368 if (
1369 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1370 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1371 ) {
1372 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1373 taskName
1374 ) as WorkerUsage
1375 ++taskFunctionWorkerUsage.tasks.stolen
1376 }
1377 }
1378
dd951876 1379 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1380 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1381 const workerNodes = this.workerNodes
a6b3272b 1382 .slice()
dd951876
JB
1383 .sort(
1384 (workerNodeA, workerNodeB) =>
1385 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1386 )
f201a0cd 1387 const sourceWorkerNode = workerNodes.find(
041dc05b 1388 workerNode =>
f201a0cd
JB
1389 workerNode.info.ready &&
1390 workerNode.info.id !== workerId &&
1391 workerNode.usage.tasks.queued > 0
1392 )
1393 if (sourceWorkerNode != null) {
72ae84a2 1394 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1395 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1396 this.executeTask(destinationWorkerNodeKey, task)
1397 } else {
1398 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1399 }
f201a0cd
JB
1400 this.updateTaskStolenStatisticsWorkerUsage(
1401 destinationWorkerNodeKey,
1402 task.name as string
1403 )
72695f86
JB
1404 }
1405 }
1406
1407 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1408 const sizeOffset = 1
1409 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1410 return
1411 }
72695f86
JB
1412 const sourceWorkerNode =
1413 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1414 const workerNodes = this.workerNodes
a6b3272b 1415 .slice()
72695f86
JB
1416 .sort(
1417 (workerNodeA, workerNodeB) =>
1418 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1419 )
1420 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1421 if (
0bc68267 1422 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1423 workerNode.info.ready &&
1424 workerNode.info.id !== workerId &&
0bc68267 1425 workerNode.usage.tasks.queued <
f778c355 1426 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1427 ) {
72ae84a2 1428 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1429 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1430 this.executeTask(workerNodeKey, task)
4de3d785 1431 } else {
dd951876 1432 this.enqueueTask(workerNodeKey, task)
4de3d785 1433 }
b1838604
JB
1434 this.updateTaskStolenStatisticsWorkerUsage(
1435 workerNodeKey,
b1838604
JB
1436 task.name as string
1437 )
10ecf8fd 1438 }
a2ed5053
JB
1439 }
1440 }
1441
be0676b3 1442 /**
aa9eede8 1443 * This method is the listener registered for each worker message.
be0676b3 1444 *
bdacc2d2 1445 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1446 */
1447 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1448 return message => {
21f710aa 1449 this.checkMessageWorkerId(message)
6703b9f4 1450 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1451 // Worker ready response received from worker
10e2aa7e 1452 this.handleWorkerReadyResponse(message)
7629bdf1 1453 } else if (message.taskId != null) {
81c02522 1454 // Task execution response received from worker
6b272951 1455 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1456 } else if (message.taskFunctionNames != null) {
1457 // Task function names message received from worker
46b0bb09
JB
1458 this.getWorkerInfo(
1459 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1460 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1461 }
1462 }
1463 }
1464
10e2aa7e 1465 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1466 if (message.ready === false) {
72ae84a2
JB
1467 throw new Error(
1468 `Worker ${message.workerId as number} failed to initialize`
1469 )
f05ed93c 1470 }
a5d15204 1471 const workerInfo = this.getWorkerInfo(
aad6fb64 1472 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1473 )
a5d15204 1474 workerInfo.ready = message.ready as boolean
6703b9f4 1475 workerInfo.taskFunctionNames = message.taskFunctionNames
2431bdb4
JB
1476 if (this.emitter != null && this.ready) {
1477 this.emitter.emit(PoolEvents.ready, this.info)
1478 }
6b272951
JB
1479 }
1480
1481 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1482 const { taskId, workerError, data } = message
5441aea6 1483 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1484 if (promiseResponse != null) {
6703b9f4
JB
1485 if (workerError != null) {
1486 this.emitter?.emit(PoolEvents.taskError, workerError)
1487 promiseResponse.reject(workerError.message)
6b272951 1488 } else {
5441aea6 1489 promiseResponse.resolve(data as Response)
6b272951 1490 }
501aea93
JB
1491 const workerNodeKey = promiseResponse.workerNodeKey
1492 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1493 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1494 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1495 if (
1496 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1497 this.tasksQueueSize(workerNodeKey) > 0 &&
1498 this.workerNodes[workerNodeKey].usage.tasks.executing <
1499 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1500 ) {
1501 this.executeTask(
1502 workerNodeKey,
1503 this.dequeueTask(workerNodeKey) as Task<Data>
1504 )
be0676b3
APA
1505 }
1506 }
be0676b3 1507 }
7c0ba920 1508
a1763c54 1509 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1510 if (this.busy) {
1511 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1512 }
1513 }
1514
1515 private checkAndEmitTaskQueuingEvents (): void {
1516 if (this.hasBackPressure()) {
1517 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1518 }
1519 }
1520
33e6bb4c
JB
1521 private checkAndEmitDynamicWorkerCreationEvents (): void {
1522 if (this.type === PoolTypes.dynamic) {
1523 if (this.full) {
1524 this.emitter?.emit(PoolEvents.full, this.info)
1525 }
1526 }
1527 }
1528
8a1260a3 1529 /**
aa9eede8 1530 * Gets the worker information given its worker node key.
8a1260a3
JB
1531 *
1532 * @param workerNodeKey - The worker node key.
3f09ed9f 1533 * @returns The worker information.
8a1260a3 1534 */
46b0bb09
JB
1535 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1536 return this.workerNodes[workerNodeKey].info
e221309a
JB
1537 }
1538
a05c10de 1539 /**
b0a4db63 1540 * Adds the given worker in the pool worker nodes.
ea7a90d3 1541 *
38e795c1 1542 * @param worker - The worker.
aa9eede8
JB
1543 * @returns The added worker node key.
1544 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1545 */
b0a4db63 1546 private addWorkerNode (worker: Worker): number {
671d5154
JB
1547 const workerNode = new WorkerNode<Worker, Data>(
1548 worker,
ff3f866a 1549 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1550 )
b97d82d8 1551 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1552 if (this.starting) {
1553 workerNode.info.ready = true
1554 }
aa9eede8 1555 this.workerNodes.push(workerNode)
aad6fb64 1556 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1557 if (workerNodeKey === -1) {
86ed0598 1558 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1559 }
1560 return workerNodeKey
ea7a90d3 1561 }
c923ce56 1562
51fe3d3c 1563 /**
f06e48d8 1564 * Removes the given worker from the pool worker nodes.
51fe3d3c 1565 *
f06e48d8 1566 * @param worker - The worker.
51fe3d3c 1567 */
416fd65c 1568 private removeWorkerNode (worker: Worker): void {
aad6fb64 1569 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1570 if (workerNodeKey !== -1) {
1571 this.workerNodes.splice(workerNodeKey, 1)
1572 this.workerChoiceStrategyContext.remove(workerNodeKey)
1573 }
51fe3d3c 1574 }
adc3c320 1575
e2b31e32
JB
1576 /** @inheritDoc */
1577 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1578 return (
e2b31e32
JB
1579 this.opts.enableTasksQueue === true &&
1580 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1581 )
1582 }
1583
1584 private hasBackPressure (): boolean {
1585 return (
1586 this.opts.enableTasksQueue === true &&
1587 this.workerNodes.findIndex(
041dc05b 1588 workerNode => !workerNode.hasBackPressure()
a1763c54 1589 ) === -1
9e844245 1590 )
e2b31e32
JB
1591 }
1592
b0a4db63 1593 /**
aa9eede8 1594 * Executes the given task on the worker given its worker node key.
b0a4db63 1595 *
aa9eede8 1596 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1597 * @param task - The task to execute.
1598 */
2e81254d 1599 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1600 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1601 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1602 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1603 }
1604
f9f00b5f 1605 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1606 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1607 this.checkAndEmitTaskQueuingEvents()
1608 return tasksQueueSize
adc3c320
JB
1609 }
1610
416fd65c 1611 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1612 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1613 }
1614
416fd65c 1615 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1616 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1617 }
1618
81c02522 1619 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1620 while (this.tasksQueueSize(workerNodeKey) > 0) {
1621 this.executeTask(
1622 workerNodeKey,
1623 this.dequeueTask(workerNodeKey) as Task<Data>
1624 )
ff733df7 1625 }
4b628b48 1626 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1627 }
1628
ef41a6e6
JB
1629 private flushTasksQueues (): void {
1630 for (const [workerNodeKey] of this.workerNodes.entries()) {
1631 this.flushTasksQueue(workerNodeKey)
1632 }
1633 }
c97c7edb 1634}