refactor: use pool side task functions list for hasTaskFunction()
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
52b71763 72 * The task execution response promise map.
be0676b3 73 *
2740a743 74 * - `key`: The message id of each submitted task.
a3445496 75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 76 *
a3445496 77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 78 */
501aea93
JB
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 81
a35560ba 82 /**
51fe3d3c 83 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
86 Worker,
87 Data,
88 Response
a35560ba
S
89 >
90
8735b4e5
JB
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
075e51d1 96 /**
adc9cc64 97 * Whether the pool is starting or not.
075e51d1
JB
98 */
99 private readonly starting: boolean
15b176e0
JB
100 /**
101 * Whether the pool is started or not.
102 */
103 private started: boolean
afe0d5bf
JB
104 /**
105 * The start timestamp of the pool.
106 */
107 private readonly startTimestamp
108
729c563d
S
109 /**
110 * Constructs a new poolifier pool.
111 *
38e795c1 112 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 113 * @param filePath - Path to the worker file.
38e795c1 114 * @param opts - Options for the pool.
729c563d 115 */
c97c7edb 116 public constructor (
b4213b7f
JB
117 protected readonly numberOfWorkers: number,
118 protected readonly filePath: string,
119 protected readonly opts: PoolOptions<Worker>
c97c7edb 120 ) {
78cea37e 121 if (!this.isMain()) {
04f45163 122 throw new Error(
8c6d4acf 123 'Cannot start a pool from a worker with the same type as the pool'
04f45163 124 )
c97c7edb 125 }
8d3782fa 126 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 127 this.checkFilePath(this.filePath)
7c0ba920 128 this.checkPoolOptions(this.opts)
1086026a 129
7254e419
JB
130 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
131 this.executeTask = this.executeTask.bind(this)
132 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 133
6bd72cd0 134 if (this.opts.enableEvents === true) {
7c0ba920
JB
135 this.emitter = new PoolEmitter()
136 }
d59df138
JB
137 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
138 Worker,
139 Data,
140 Response
da309861
JB
141 >(
142 this,
143 this.opts.workerChoiceStrategy,
144 this.opts.workerChoiceStrategyOptions
145 )
b6b32453
JB
146
147 this.setupHook()
148
075e51d1 149 this.starting = true
e761c033 150 this.startPool()
075e51d1 151 this.starting = false
15b176e0 152 this.started = true
afe0d5bf
JB
153
154 this.startTimestamp = performance.now()
c97c7edb
S
155 }
156
a35560ba 157 private checkFilePath (filePath: string): void {
ffcbbad8
JB
158 if (
159 filePath == null ||
3d6dd312 160 typeof filePath !== 'string' ||
ffcbbad8
JB
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
c510fea7
APA
163 throw new Error('Please specify a file with a worker implementation')
164 }
3d6dd312
JB
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
c510fea7
APA
168 }
169
8d3782fa
JB
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
78cea37e 175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 176 throw new TypeError(
0d80593b 177 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
178 )
179 } else if (numberOfWorkers < 0) {
473c717a 180 throw new RangeError(
8d3782fa
JB
181 'Cannot instantiate a pool with a negative number of workers'
182 )
6b27d407 183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 189 if (this.type === PoolTypes.dynamic) {
a5ed75b7 190 if (max == null) {
e695d66f 191 throw new TypeError(
a5ed75b7
JB
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
079de991
JB
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
b97d82d8 202 } else if (max === 0) {
079de991 203 throw new RangeError(
d640b48b 204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
8d3782fa
JB
211 }
212 }
213
7c0ba920 214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
215 if (isPlainObject(opts)) {
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
49be33fe
JB
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
1f68cede 226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 239 }
aee46736
JB
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 246 throw new Error(
aee46736 247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
248 )
249 }
7c0ba920
JB
250 }
251
0d80593b
JB
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
8990357d 260 if (
8c0b113f
JB
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
263 ) {
264 throw new TypeError(
8c0b113f 265 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
266 )
267 }
268 if (
8c0b113f
JB
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
8990357d
JB
271 ) {
272 throw new RangeError(
8c0b113f 273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
274 )
275 }
49be33fe
JB
276 if (
277 workerChoiceStrategyOptions.weights != null &&
6b27d407 278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
f0d7f803
JB
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
0d80593b
JB
294 }
295
a20f0ba5 296 private checkValidTasksQueueOptions (
76d91ea0 297 tasksQueueOptions: TasksQueueOptions
a20f0ba5 298 ): void {
0d80593b
JB
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
f0d7f803 302 if (
b7d085c4
JB
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
305 ) {
306 throw new TypeError(
20c6f652 307 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
308 )
309 }
310 if (
b7d085c4
JB
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
f0d7f803 313 ) {
e695d66f 314 throw new RangeError(
b7d085c4 315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
316 )
317 }
b7d085c4 318 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 319 throw new Error(
68dbcdc0 320 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
321 )
322 }
323 if (
b7d085c4
JB
324 tasksQueueOptions?.size != null &&
325 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 326 ) {
ff3f866a 327 throw new TypeError(
68dbcdc0 328 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
329 )
330 }
b7d085c4 331 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 332 throw new RangeError(
b7d085c4 333 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
334 )
335 }
336 }
337
e761c033
JB
338 private startPool (): void {
339 while (
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
343 0
344 ) < this.numberOfWorkers
345 ) {
aa9eede8 346 this.createAndSetupWorkerNode()
e761c033
JB
347 }
348 }
349
08f3f44c 350 /** @inheritDoc */
6b27d407
JB
351 public get info (): PoolInfo {
352 return {
23ccf9d7 353 version,
6b27d407 354 type: this.type,
184855e6 355 worker: this.worker,
2431bdb4
JB
356 ready: this.ready,
357 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
358 minSize: this.minSize,
359 maxSize: this.maxSize,
c05f0d50
JB
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.aggregate &&
1305e9a8
JB
362 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
364 workerNodes: this.workerNodes.length,
365 idleWorkerNodes: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
f59e1027 367 workerNode.usage.tasks.executing === 0
a4e07f72
JB
368 ? accumulator + 1
369 : accumulator,
6b27d407
JB
370 0
371 ),
372 busyWorkerNodes: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
f59e1027 374 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
375 0
376 ),
a4e07f72 377 executedTasks: this.workerNodes.reduce(
6b27d407 378 (accumulator, workerNode) =>
f59e1027 379 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
380 0
381 ),
382 executingTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
f59e1027 384 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
385 0
386 ),
daf86646
JB
387 ...(this.opts.enableTasksQueue === true && {
388 queuedTasks: this.workerNodes.reduce(
389 (accumulator, workerNode) =>
390 accumulator + workerNode.usage.tasks.queued,
391 0
392 )
393 }),
394 ...(this.opts.enableTasksQueue === true && {
395 maxQueuedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
398 0
399 )
400 }),
a1763c54
JB
401 ...(this.opts.enableTasksQueue === true && {
402 backPressure: this.hasBackPressure()
403 }),
68cbdc84
JB
404 ...(this.opts.enableTasksQueue === true && {
405 stolenTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.stolen,
408 0
409 )
410 }),
a4e07f72
JB
411 failedTasks: this.workerNodes.reduce(
412 (accumulator, workerNode) =>
f59e1027 413 accumulator + workerNode.usage.tasks.failed,
a4e07f72 414 0
1dcf8b7b
JB
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .runTime.aggregate && {
418 runTime: {
98e72cda 419 minimum: round(
90d6701c 420 min(
98e72cda 421 ...this.workerNodes.map(
041dc05b 422 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 423 )
1dcf8b7b
JB
424 )
425 ),
98e72cda 426 maximum: round(
90d6701c 427 max(
98e72cda 428 ...this.workerNodes.map(
041dc05b 429 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 430 )
1dcf8b7b 431 )
98e72cda 432 ),
3baa0837
JB
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .runTime.average && {
435 average: round(
436 average(
437 this.workerNodes.reduce<number[]>(
438 (accumulator, workerNode) =>
439 accumulator.concat(workerNode.usage.runTime.history),
440 []
441 )
98e72cda 442 )
dc021bcc 443 )
3baa0837 444 }),
98e72cda
JB
445 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
446 .runTime.median && {
447 median: round(
448 median(
3baa0837
JB
449 this.workerNodes.reduce<number[]>(
450 (accumulator, workerNode) =>
451 accumulator.concat(workerNode.usage.runTime.history),
452 []
98e72cda
JB
453 )
454 )
455 )
456 })
1dcf8b7b
JB
457 }
458 }),
459 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 .waitTime.aggregate && {
461 waitTime: {
98e72cda 462 minimum: round(
90d6701c 463 min(
98e72cda 464 ...this.workerNodes.map(
041dc05b 465 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 466 )
1dcf8b7b
JB
467 )
468 ),
98e72cda 469 maximum: round(
90d6701c 470 max(
98e72cda 471 ...this.workerNodes.map(
041dc05b 472 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 473 )
1dcf8b7b 474 )
98e72cda 475 ),
3baa0837
JB
476 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
477 .waitTime.average && {
478 average: round(
479 average(
480 this.workerNodes.reduce<number[]>(
481 (accumulator, workerNode) =>
482 accumulator.concat(workerNode.usage.waitTime.history),
483 []
484 )
98e72cda 485 )
dc021bcc 486 )
3baa0837 487 }),
98e72cda
JB
488 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 .waitTime.median && {
490 median: round(
491 median(
3baa0837
JB
492 this.workerNodes.reduce<number[]>(
493 (accumulator, workerNode) =>
494 accumulator.concat(workerNode.usage.waitTime.history),
495 []
98e72cda
JB
496 )
497 )
498 )
499 })
1dcf8b7b
JB
500 }
501 })
6b27d407
JB
502 }
503 }
08f3f44c 504
aa9eede8
JB
505 /**
506 * The pool readiness boolean status.
507 */
2431bdb4
JB
508 private get ready (): boolean {
509 return (
b97d82d8
JB
510 this.workerNodes.reduce(
511 (accumulator, workerNode) =>
512 !workerNode.info.dynamic && workerNode.info.ready
513 ? accumulator + 1
514 : accumulator,
515 0
516 ) >= this.minSize
2431bdb4
JB
517 )
518 }
519
afe0d5bf 520 /**
aa9eede8 521 * The approximate pool utilization.
afe0d5bf
JB
522 *
523 * @returns The pool utilization.
524 */
525 private get utilization (): number {
8e5ca040 526 const poolTimeCapacity =
fe7d90db 527 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
528 const totalTasksRunTime = this.workerNodes.reduce(
529 (accumulator, workerNode) =>
71514351 530 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
531 0
532 )
533 const totalTasksWaitTime = this.workerNodes.reduce(
534 (accumulator, workerNode) =>
71514351 535 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
536 0
537 )
8e5ca040 538 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
539 }
540
8881ae32 541 /**
aa9eede8 542 * The pool type.
8881ae32
JB
543 *
544 * If it is `'dynamic'`, it provides the `max` property.
545 */
546 protected abstract get type (): PoolType
547
184855e6 548 /**
aa9eede8 549 * The worker type.
184855e6
JB
550 */
551 protected abstract get worker (): WorkerType
552
c2ade475 553 /**
aa9eede8 554 * The pool minimum size.
c2ade475 555 */
8735b4e5
JB
556 protected get minSize (): number {
557 return this.numberOfWorkers
558 }
ff733df7
JB
559
560 /**
aa9eede8 561 * The pool maximum size.
ff733df7 562 */
8735b4e5
JB
563 protected get maxSize (): number {
564 return this.max ?? this.numberOfWorkers
565 }
a35560ba 566
6b813701
JB
567 /**
568 * Checks if the worker id sent in the received message from a worker is valid.
569 *
570 * @param message - The received message.
571 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
572 */
21f710aa 573 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
574 if (message.workerId == null) {
575 throw new Error('Worker message received without worker id')
576 } else if (
21f710aa 577 message.workerId != null &&
aad6fb64 578 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
579 ) {
580 throw new Error(
581 `Worker message received from unknown worker '${message.workerId}'`
582 )
583 }
584 }
585
ffcbbad8 586 /**
f06e48d8 587 * Gets the given worker its worker node key.
ffcbbad8
JB
588 *
589 * @param worker - The worker.
f59e1027 590 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 591 */
aad6fb64 592 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 593 return this.workerNodes.findIndex(
041dc05b 594 workerNode => workerNode.worker === worker
f06e48d8 595 )
bf9549ae
JB
596 }
597
aa9eede8
JB
598 /**
599 * Gets the worker node key given its worker id.
600 *
601 * @param workerId - The worker id.
aad6fb64 602 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 603 */
aad6fb64
JB
604 private getWorkerNodeKeyByWorkerId (workerId: number): number {
605 return this.workerNodes.findIndex(
041dc05b 606 workerNode => workerNode.info.id === workerId
aad6fb64 607 )
aa9eede8
JB
608 }
609
afc003b2 610 /** @inheritDoc */
a35560ba 611 public setWorkerChoiceStrategy (
59219cbb
JB
612 workerChoiceStrategy: WorkerChoiceStrategy,
613 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 614 ): void {
aee46736 615 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 616 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
617 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
618 this.opts.workerChoiceStrategy
619 )
620 if (workerChoiceStrategyOptions != null) {
621 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
622 }
aa9eede8 623 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 624 workerNode.resetUsage()
9edb9717 625 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 626 }
a20f0ba5
JB
627 }
628
629 /** @inheritDoc */
630 public setWorkerChoiceStrategyOptions (
631 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
632 ): void {
0d80593b 633 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
634 this.opts.workerChoiceStrategyOptions = {
635 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
636 ...workerChoiceStrategyOptions
637 }
a20f0ba5
JB
638 this.workerChoiceStrategyContext.setOptions(
639 this.opts.workerChoiceStrategyOptions
a35560ba
S
640 )
641 }
642
a20f0ba5 643 /** @inheritDoc */
8f52842f
JB
644 public enableTasksQueue (
645 enable: boolean,
646 tasksQueueOptions?: TasksQueueOptions
647 ): void {
a20f0ba5 648 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 649 this.flushTasksQueues()
a20f0ba5
JB
650 }
651 this.opts.enableTasksQueue = enable
8f52842f 652 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
653 }
654
655 /** @inheritDoc */
8f52842f 656 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 657 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
658 this.checkValidTasksQueueOptions(tasksQueueOptions)
659 this.opts.tasksQueueOptions =
660 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 661 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 662 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
663 delete this.opts.tasksQueueOptions
664 }
665 }
666
5b49e864 667 private setTasksQueueSize (size: number): void {
20c6f652 668 for (const workerNode of this.workerNodes) {
ff3f866a 669 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
670 }
671 }
672
a20f0ba5
JB
673 private buildTasksQueueOptions (
674 tasksQueueOptions: TasksQueueOptions
675 ): TasksQueueOptions {
676 return {
20c6f652 677 ...{
ff3f866a 678 size: Math.pow(this.maxSize, 2),
20c6f652
JB
679 concurrency: 1
680 },
681 ...tasksQueueOptions
a20f0ba5
JB
682 }
683 }
684
c319c66b
JB
685 /**
686 * Whether the pool is full or not.
687 *
688 * The pool filling boolean status.
689 */
dea903a8
JB
690 protected get full (): boolean {
691 return this.workerNodes.length >= this.maxSize
692 }
c2ade475 693
c319c66b
JB
694 /**
695 * Whether the pool is busy or not.
696 *
697 * The pool busyness boolean status.
698 */
699 protected abstract get busy (): boolean
7c0ba920 700
6c6afb84 701 /**
3d76750a 702 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
703 *
704 * @returns Worker nodes busyness boolean status.
705 */
c2ade475 706 protected internalBusy (): boolean {
3d76750a
JB
707 if (this.opts.enableTasksQueue === true) {
708 return (
709 this.workerNodes.findIndex(
041dc05b 710 workerNode =>
3d76750a
JB
711 workerNode.info.ready &&
712 workerNode.usage.tasks.executing <
713 (this.opts.tasksQueueOptions?.concurrency as number)
714 ) === -1
715 )
716 } else {
717 return (
718 this.workerNodes.findIndex(
041dc05b 719 workerNode =>
3d76750a
JB
720 workerNode.info.ready && workerNode.usage.tasks.executing === 0
721 ) === -1
722 )
723 }
cb70b19d
JB
724 }
725
6703b9f4
JB
726 private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
727 let messagesCount = 0
728 for (const [workerNodeKey] of this.workerNodes.entries()) {
729 this.sendToWorker(workerNodeKey, {
730 ...message,
731 workerId: this.getWorkerInfo(workerNodeKey).id as number
732 })
733 ++messagesCount
734 }
735 return messagesCount
736 }
737
738 /** @inheritDoc */
739 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
740 for (const workerNode of this.workerNodes) {
741 if (
742 Array.isArray(workerNode.info.taskFunctionNames) &&
743 workerNode.info.taskFunctionNames.includes(name)
744 ) {
745 return true
746 }
747 }
748 return false
6703b9f4
JB
749 }
750
751 /** @inheritDoc */
752 public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
753 this.sendToWorkers({
754 taskFunctionOperation: 'add',
755 taskFunctionName: name,
756 taskFunction: taskFunction.toString()
757 })
758 return true
759 }
760
761 /** @inheritDoc */
762 public removeTaskFunction (name: string): boolean {
763 this.sendToWorkers({
764 taskFunctionOperation: 'remove',
765 taskFunctionName: name
766 })
767 return true
768 }
769
90d7d101 770 /** @inheritDoc */
6703b9f4 771 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
772 for (const workerNode of this.workerNodes) {
773 if (
6703b9f4
JB
774 Array.isArray(workerNode.info.taskFunctionNames) &&
775 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 776 ) {
6703b9f4 777 return workerNode.info.taskFunctionNames
f2dbbf95 778 }
90d7d101 779 }
f2dbbf95 780 return []
90d7d101
JB
781 }
782
6703b9f4
JB
783 /** @inheritDoc */
784 public setDefaultTaskFunction (name: string): boolean {
785 this.sendToWorkers({
786 taskFunctionOperation: 'default',
787 taskFunctionName: name
788 })
789 return true
790 }
791
375f7504
JB
792 private shallExecuteTask (workerNodeKey: number): boolean {
793 return (
794 this.tasksQueueSize(workerNodeKey) === 0 &&
795 this.workerNodes[workerNodeKey].usage.tasks.executing <
796 (this.opts.tasksQueueOptions?.concurrency as number)
797 )
798 }
799
afc003b2 800 /** @inheritDoc */
7d91a8cd
JB
801 public async execute (
802 data?: Data,
803 name?: string,
804 transferList?: TransferListItem[]
805 ): Promise<Response> {
52b71763 806 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
807 if (!this.started) {
808 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 809 return
15b176e0 810 }
7d91a8cd
JB
811 if (name != null && typeof name !== 'string') {
812 reject(new TypeError('name argument must be a string'))
9d2d0da1 813 return
7d91a8cd 814 }
90d7d101
JB
815 if (
816 name != null &&
817 typeof name === 'string' &&
818 name.trim().length === 0
819 ) {
f58b60b9 820 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 821 return
90d7d101 822 }
b558f6b5
JB
823 if (transferList != null && !Array.isArray(transferList)) {
824 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 825 return
b558f6b5
JB
826 }
827 const timestamp = performance.now()
828 const workerNodeKey = this.chooseWorkerNode()
501aea93 829 const task: Task<Data> = {
52b71763
JB
830 name: name ?? DEFAULT_TASK_NAME,
831 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
832 data: data ?? ({} as Data),
7d91a8cd 833 transferList,
52b71763 834 timestamp,
1a28f967 835 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 836 taskId: randomUUID()
52b71763 837 }
7629bdf1 838 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
839 resolve,
840 reject,
501aea93 841 workerNodeKey
2e81254d 842 })
52b71763 843 if (
4e377863
JB
844 this.opts.enableTasksQueue === false ||
845 (this.opts.enableTasksQueue === true &&
375f7504 846 this.shallExecuteTask(workerNodeKey))
52b71763 847 ) {
501aea93 848 this.executeTask(workerNodeKey, task)
4e377863
JB
849 } else {
850 this.enqueueTask(workerNodeKey, task)
52b71763 851 }
2e81254d 852 })
280c2a77 853 }
c97c7edb 854
afc003b2 855 /** @inheritDoc */
c97c7edb 856 public async destroy (): Promise<void> {
1fbcaa7c 857 await Promise.all(
81c02522 858 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 859 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
860 })
861 )
33e6bb4c 862 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 863 this.started = false
c97c7edb
S
864 }
865
1e3214b6
JB
866 protected async sendKillMessageToWorker (
867 workerNodeKey: number,
868 workerId: number
869 ): Promise<void> {
9edb9717 870 await new Promise<void>((resolve, reject) => {
041dc05b 871 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
872 if (message.kill === 'success') {
873 resolve()
874 } else if (message.kill === 'failure') {
e1af34e6 875 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
876 }
877 })
9edb9717 878 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 879 })
1e3214b6
JB
880 }
881
4a6952ff 882 /**
aa9eede8 883 * Terminates the worker node given its worker node key.
4a6952ff 884 *
aa9eede8 885 * @param workerNodeKey - The worker node key.
4a6952ff 886 */
81c02522 887 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 888
729c563d 889 /**
6677a3d3
JB
890 * Setup hook to execute code before worker nodes are created in the abstract constructor.
891 * Can be overridden.
afc003b2
JB
892 *
893 * @virtual
729c563d 894 */
280c2a77 895 protected setupHook (): void {
965df41c 896 /* Intentionally empty */
280c2a77 897 }
c97c7edb 898
729c563d 899 /**
280c2a77
S
900 * Should return whether the worker is the main worker or not.
901 */
902 protected abstract isMain (): boolean
903
904 /**
2e81254d 905 * Hook executed before the worker task execution.
bf9549ae 906 * Can be overridden.
729c563d 907 *
f06e48d8 908 * @param workerNodeKey - The worker node key.
1c6fe997 909 * @param task - The task to execute.
729c563d 910 */
1c6fe997
JB
911 protected beforeTaskExecutionHook (
912 workerNodeKey: number,
913 task: Task<Data>
914 ): void {
94407def
JB
915 if (this.workerNodes[workerNodeKey]?.usage != null) {
916 const workerUsage = this.workerNodes[workerNodeKey].usage
917 ++workerUsage.tasks.executing
918 this.updateWaitTimeWorkerUsage(workerUsage, task)
919 }
920 if (
921 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
922 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
923 task.name as string
924 ) != null
925 ) {
db0e38ee 926 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 927 workerNodeKey
db0e38ee 928 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
929 ++taskFunctionWorkerUsage.tasks.executing
930 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 931 }
c97c7edb
S
932 }
933
c01733f1 934 /**
2e81254d 935 * Hook executed after the worker task execution.
bf9549ae 936 * Can be overridden.
c01733f1 937 *
501aea93 938 * @param workerNodeKey - The worker node key.
38e795c1 939 * @param message - The received message.
c01733f1 940 */
2e81254d 941 protected afterTaskExecutionHook (
501aea93 942 workerNodeKey: number,
2740a743 943 message: MessageValue<Response>
bf9549ae 944 ): void {
94407def
JB
945 if (this.workerNodes[workerNodeKey]?.usage != null) {
946 const workerUsage = this.workerNodes[workerNodeKey].usage
947 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
948 this.updateRunTimeWorkerUsage(workerUsage, message)
949 this.updateEluWorkerUsage(workerUsage, message)
950 }
951 if (
952 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
953 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 954 message.taskPerformance?.name as string
94407def
JB
955 ) != null
956 ) {
db0e38ee 957 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 958 workerNodeKey
db0e38ee 959 ].getTaskFunctionWorkerUsage(
0628755c 960 message.taskPerformance?.name as string
b558f6b5 961 ) as WorkerUsage
db0e38ee
JB
962 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
963 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
964 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
965 }
966 }
967
db0e38ee
JB
968 /**
969 * Whether the worker node shall update its task function worker usage or not.
970 *
971 * @param workerNodeKey - The worker node key.
972 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
973 */
974 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 975 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 976 return (
94407def 977 workerInfo != null &&
6703b9f4
JB
978 Array.isArray(workerInfo.taskFunctionNames) &&
979 workerInfo.taskFunctionNames.length > 2
b558f6b5 980 )
f1c06930
JB
981 }
982
983 private updateTaskStatisticsWorkerUsage (
984 workerUsage: WorkerUsage,
985 message: MessageValue<Response>
986 ): void {
a4e07f72 987 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
988 if (
989 workerTaskStatistics.executing != null &&
990 workerTaskStatistics.executing > 0
991 ) {
992 --workerTaskStatistics.executing
5bb5be17 993 }
6703b9f4 994 if (message.workerError == null) {
98e72cda
JB
995 ++workerTaskStatistics.executed
996 } else {
a4e07f72 997 ++workerTaskStatistics.failed
2740a743 998 }
f8eb0a2a
JB
999 }
1000
a4e07f72
JB
1001 private updateRunTimeWorkerUsage (
1002 workerUsage: WorkerUsage,
f8eb0a2a
JB
1003 message: MessageValue<Response>
1004 ): void {
6703b9f4 1005 if (message.workerError != null) {
dc021bcc
JB
1006 return
1007 }
e4f20deb
JB
1008 updateMeasurementStatistics(
1009 workerUsage.runTime,
1010 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1011 message.taskPerformance?.runTime ?? 0
e4f20deb 1012 )
f8eb0a2a
JB
1013 }
1014
a4e07f72
JB
1015 private updateWaitTimeWorkerUsage (
1016 workerUsage: WorkerUsage,
1c6fe997 1017 task: Task<Data>
f8eb0a2a 1018 ): void {
1c6fe997
JB
1019 const timestamp = performance.now()
1020 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1021 updateMeasurementStatistics(
1022 workerUsage.waitTime,
1023 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1024 taskWaitTime
e4f20deb 1025 )
c01733f1 1026 }
1027
a4e07f72 1028 private updateEluWorkerUsage (
5df69fab 1029 workerUsage: WorkerUsage,
62c15a68
JB
1030 message: MessageValue<Response>
1031 ): void {
6703b9f4 1032 if (message.workerError != null) {
dc021bcc
JB
1033 return
1034 }
008512c7
JB
1035 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1036 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1037 updateMeasurementStatistics(
1038 workerUsage.elu.active,
008512c7 1039 eluTaskStatisticsRequirements,
dc021bcc 1040 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1041 )
1042 updateMeasurementStatistics(
1043 workerUsage.elu.idle,
008512c7 1044 eluTaskStatisticsRequirements,
dc021bcc 1045 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1046 )
008512c7 1047 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1048 if (message.taskPerformance?.elu != null) {
f7510105
JB
1049 if (workerUsage.elu.utilization != null) {
1050 workerUsage.elu.utilization =
1051 (workerUsage.elu.utilization +
1052 message.taskPerformance.elu.utilization) /
1053 2
1054 } else {
1055 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1056 }
62c15a68
JB
1057 }
1058 }
1059 }
1060
280c2a77 1061 /**
f06e48d8 1062 * Chooses a worker node for the next task.
280c2a77 1063 *
6c6afb84 1064 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1065 *
aa9eede8 1066 * @returns The chosen worker node key
280c2a77 1067 */
6c6afb84 1068 private chooseWorkerNode (): number {
930dcf12 1069 if (this.shallCreateDynamicWorker()) {
aa9eede8 1070 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1071 if (
b1aae695 1072 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1073 ) {
aa9eede8 1074 return workerNodeKey
6c6afb84 1075 }
17393ac8 1076 }
930dcf12
JB
1077 return this.workerChoiceStrategyContext.execute()
1078 }
1079
6c6afb84
JB
1080 /**
1081 * Conditions for dynamic worker creation.
1082 *
1083 * @returns Whether to create a dynamic worker or not.
1084 */
1085 private shallCreateDynamicWorker (): boolean {
930dcf12 1086 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1087 }
1088
280c2a77 1089 /**
aa9eede8 1090 * Sends a message to worker given its worker node key.
280c2a77 1091 *
aa9eede8 1092 * @param workerNodeKey - The worker node key.
38e795c1 1093 * @param message - The message.
7d91a8cd 1094 * @param transferList - The optional array of transferable objects.
280c2a77
S
1095 */
1096 protected abstract sendToWorker (
aa9eede8 1097 workerNodeKey: number,
7d91a8cd
JB
1098 message: MessageValue<Data>,
1099 transferList?: TransferListItem[]
280c2a77
S
1100 ): void
1101
729c563d 1102 /**
41344292 1103 * Creates a new worker.
6c6afb84
JB
1104 *
1105 * @returns Newly created worker.
729c563d 1106 */
280c2a77 1107 protected abstract createWorker (): Worker
c97c7edb 1108
4a6952ff 1109 /**
aa9eede8 1110 * Creates a new, completely set up worker node.
4a6952ff 1111 *
aa9eede8 1112 * @returns New, completely set up worker node key.
4a6952ff 1113 */
aa9eede8 1114 protected createAndSetupWorkerNode (): number {
bdacc2d2 1115 const worker = this.createWorker()
280c2a77 1116
fd04474e 1117 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1118 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1119 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1120 worker.on('error', error => {
aad6fb64 1121 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1122 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1123 workerInfo.ready = false
0dc838e3 1124 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1125 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1126 if (
1127 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1128 this.started &&
1129 !this.starting
15b176e0 1130 ) {
9b106837 1131 if (workerInfo.dynamic) {
aa9eede8 1132 this.createAndSetupDynamicWorkerNode()
8a1260a3 1133 } else {
aa9eede8 1134 this.createAndSetupWorkerNode()
8a1260a3 1135 }
5baee0d7 1136 }
19dbc45b 1137 if (this.opts.enableTasksQueue === true) {
9b106837 1138 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1139 }
5baee0d7 1140 })
a35560ba 1141 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1142 worker.once('exit', () => {
f06e48d8 1143 this.removeWorkerNode(worker)
a974afa6 1144 })
280c2a77 1145
aa9eede8 1146 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1147
aa9eede8 1148 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1149
aa9eede8 1150 return workerNodeKey
c97c7edb 1151 }
be0676b3 1152
930dcf12 1153 /**
aa9eede8 1154 * Creates a new, completely set up dynamic worker node.
930dcf12 1155 *
aa9eede8 1156 * @returns New, completely set up dynamic worker node key.
930dcf12 1157 */
aa9eede8
JB
1158 protected createAndSetupDynamicWorkerNode (): number {
1159 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1160 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1161 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1162 message.workerId
aad6fb64 1163 )
aa9eede8 1164 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1165 // Kill message received from worker
930dcf12
JB
1166 if (
1167 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1168 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1169 ((this.opts.enableTasksQueue === false &&
aa9eede8 1170 workerUsage.tasks.executing === 0) ||
7b56f532 1171 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1172 workerUsage.tasks.executing === 0 &&
1173 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1174 ) {
041dc05b 1175 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1176 this.emitter?.emit(PoolEvents.error, error)
1177 })
930dcf12
JB
1178 }
1179 })
46b0bb09 1180 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1181 this.sendToWorker(workerNodeKey, {
b0a4db63 1182 checkActive: true,
21f710aa
JB
1183 workerId: workerInfo.id as number
1184 })
b5e113f6 1185 workerInfo.dynamic = true
b1aae695
JB
1186 if (
1187 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1188 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1189 ) {
b5e113f6
JB
1190 workerInfo.ready = true
1191 }
33e6bb4c 1192 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1193 return workerNodeKey
930dcf12
JB
1194 }
1195
a2ed5053 1196 /**
aa9eede8 1197 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1198 *
aa9eede8 1199 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1200 * @param listener - The message listener callback.
1201 */
85aeb3f3
JB
1202 protected abstract registerWorkerMessageListener<
1203 Message extends Data | Response
aa9eede8
JB
1204 >(
1205 workerNodeKey: number,
1206 listener: (message: MessageValue<Message>) => void
1207 ): void
a2ed5053
JB
1208
1209 /**
aa9eede8 1210 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1211 * Can be overridden.
1212 *
aa9eede8 1213 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1214 */
aa9eede8 1215 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1216 // Listen to worker messages.
aa9eede8 1217 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1218 // Send the startup message to worker.
aa9eede8 1219 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1220 // Send the statistics message to worker.
1221 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1222 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1223 this.workerNodes[workerNodeKey].onEmptyQueue =
1224 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1225 this.workerNodes[workerNodeKey].onBackPressure =
1226 this.tasksStealingOnBackPressure.bind(this)
1227 }
d2c73f82
JB
1228 }
1229
85aeb3f3 1230 /**
aa9eede8
JB
1231 * Sends the startup message to worker given its worker node key.
1232 *
1233 * @param workerNodeKey - The worker node key.
1234 */
1235 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1236
1237 /**
9edb9717 1238 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1239 *
aa9eede8 1240 * @param workerNodeKey - The worker node key.
85aeb3f3 1241 */
9edb9717 1242 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1243 this.sendToWorker(workerNodeKey, {
1244 statistics: {
1245 runTime:
1246 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1247 .runTime.aggregate,
1248 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1249 .elu.aggregate
1250 },
46b0bb09 1251 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1252 })
1253 }
a2ed5053
JB
1254
1255 private redistributeQueuedTasks (workerNodeKey: number): void {
1256 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1257 const destinationWorkerNodeKey = this.workerNodes.reduce(
1258 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1259 return workerNode.info.ready &&
1260 workerNode.usage.tasks.queued <
1261 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1262 ? workerNodeKey
1263 : minWorkerNodeKey
1264 },
1265 0
1266 )
3f690f25
JB
1267 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1268 const task = {
1269 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1270 workerId: destinationWorkerNode.info.id as number
1271 }
1272 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1273 this.executeTask(destinationWorkerNodeKey, task)
1274 } else {
1275 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1276 }
1277 }
1278 }
1279
b1838604
JB
1280 private updateTaskStolenStatisticsWorkerUsage (
1281 workerNodeKey: number,
b1838604
JB
1282 taskName: string
1283 ): void {
1a880eca 1284 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1285 if (workerNode?.usage != null) {
1286 ++workerNode.usage.tasks.stolen
1287 }
1288 if (
1289 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1290 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1291 ) {
1292 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1293 taskName
1294 ) as WorkerUsage
1295 ++taskFunctionWorkerUsage.tasks.stolen
1296 }
1297 }
1298
dd951876 1299 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1300 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1301 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1302 const workerNodes = this.workerNodes
a6b3272b 1303 .slice()
dd951876
JB
1304 .sort(
1305 (workerNodeA, workerNodeB) =>
1306 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1307 )
f201a0cd 1308 const sourceWorkerNode = workerNodes.find(
041dc05b 1309 workerNode =>
f201a0cd
JB
1310 workerNode.info.ready &&
1311 workerNode.info.id !== workerId &&
1312 workerNode.usage.tasks.queued > 0
1313 )
1314 if (sourceWorkerNode != null) {
1315 const task = {
1316 ...(sourceWorkerNode.popTask() as Task<Data>),
1317 workerId: destinationWorkerNode.info.id as number
0bc68267 1318 }
f201a0cd
JB
1319 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1320 this.executeTask(destinationWorkerNodeKey, task)
1321 } else {
1322 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1323 }
f201a0cd
JB
1324 this.updateTaskStolenStatisticsWorkerUsage(
1325 destinationWorkerNodeKey,
1326 task.name as string
1327 )
72695f86
JB
1328 }
1329 }
1330
1331 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1332 const sizeOffset = 1
1333 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1334 return
1335 }
72695f86
JB
1336 const sourceWorkerNode =
1337 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1338 const workerNodes = this.workerNodes
a6b3272b 1339 .slice()
72695f86
JB
1340 .sort(
1341 (workerNodeA, workerNodeB) =>
1342 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1343 )
1344 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1345 if (
0bc68267 1346 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1347 workerNode.info.ready &&
1348 workerNode.info.id !== workerId &&
0bc68267 1349 workerNode.usage.tasks.queued <
f778c355 1350 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1351 ) {
dd951876
JB
1352 const task = {
1353 ...(sourceWorkerNode.popTask() as Task<Data>),
1354 workerId: workerNode.info.id as number
1355 }
375f7504 1356 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1357 this.executeTask(workerNodeKey, task)
4de3d785 1358 } else {
dd951876 1359 this.enqueueTask(workerNodeKey, task)
4de3d785 1360 }
b1838604
JB
1361 this.updateTaskStolenStatisticsWorkerUsage(
1362 workerNodeKey,
b1838604
JB
1363 task.name as string
1364 )
10ecf8fd 1365 }
a2ed5053
JB
1366 }
1367 }
1368
be0676b3 1369 /**
aa9eede8 1370 * This method is the listener registered for each worker message.
be0676b3 1371 *
bdacc2d2 1372 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1373 */
1374 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1375 return message => {
21f710aa 1376 this.checkMessageWorkerId(message)
6703b9f4 1377 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1378 // Worker ready response received from worker
10e2aa7e 1379 this.handleWorkerReadyResponse(message)
7629bdf1 1380 } else if (message.taskId != null) {
81c02522 1381 // Task execution response received from worker
6b272951 1382 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1383 } else if (message.taskFunctionNames != null) {
1384 // Task function names message received from worker
46b0bb09
JB
1385 this.getWorkerInfo(
1386 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4
JB
1387 ).taskFunctionNames = message.taskFunctionNames
1388 } else if (message.taskFunctionOperation != null) {
1389 // Task function operation response received from worker
6b272951
JB
1390 }
1391 }
1392 }
1393
10e2aa7e 1394 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1395 if (message.ready === false) {
1396 throw new Error(`Worker ${message.workerId} failed to initialize`)
1397 }
a5d15204 1398 const workerInfo = this.getWorkerInfo(
aad6fb64 1399 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1400 )
a5d15204 1401 workerInfo.ready = message.ready as boolean
6703b9f4 1402 workerInfo.taskFunctionNames = message.taskFunctionNames
2431bdb4
JB
1403 if (this.emitter != null && this.ready) {
1404 this.emitter.emit(PoolEvents.ready, this.info)
1405 }
6b272951
JB
1406 }
1407
1408 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1409 const { taskId, workerError, data } = message
5441aea6 1410 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1411 if (promiseResponse != null) {
6703b9f4
JB
1412 if (workerError != null) {
1413 this.emitter?.emit(PoolEvents.taskError, workerError)
1414 promiseResponse.reject(workerError.message)
6b272951 1415 } else {
5441aea6 1416 promiseResponse.resolve(data as Response)
6b272951 1417 }
501aea93
JB
1418 const workerNodeKey = promiseResponse.workerNodeKey
1419 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1420 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1421 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1422 if (
1423 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1424 this.tasksQueueSize(workerNodeKey) > 0 &&
1425 this.workerNodes[workerNodeKey].usage.tasks.executing <
1426 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1427 ) {
1428 this.executeTask(
1429 workerNodeKey,
1430 this.dequeueTask(workerNodeKey) as Task<Data>
1431 )
be0676b3
APA
1432 }
1433 }
be0676b3 1434 }
7c0ba920 1435
a1763c54 1436 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1437 if (this.busy) {
1438 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1439 }
1440 }
1441
1442 private checkAndEmitTaskQueuingEvents (): void {
1443 if (this.hasBackPressure()) {
1444 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1445 }
1446 }
1447
33e6bb4c
JB
1448 private checkAndEmitDynamicWorkerCreationEvents (): void {
1449 if (this.type === PoolTypes.dynamic) {
1450 if (this.full) {
1451 this.emitter?.emit(PoolEvents.full, this.info)
1452 }
1453 }
1454 }
1455
8a1260a3 1456 /**
aa9eede8 1457 * Gets the worker information given its worker node key.
8a1260a3
JB
1458 *
1459 * @param workerNodeKey - The worker node key.
3f09ed9f 1460 * @returns The worker information.
8a1260a3 1461 */
46b0bb09
JB
1462 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1463 return this.workerNodes[workerNodeKey].info
e221309a
JB
1464 }
1465
a05c10de 1466 /**
b0a4db63 1467 * Adds the given worker in the pool worker nodes.
ea7a90d3 1468 *
38e795c1 1469 * @param worker - The worker.
aa9eede8
JB
1470 * @returns The added worker node key.
1471 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1472 */
b0a4db63 1473 private addWorkerNode (worker: Worker): number {
671d5154
JB
1474 const workerNode = new WorkerNode<Worker, Data>(
1475 worker,
ff3f866a 1476 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1477 )
b97d82d8 1478 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1479 if (this.starting) {
1480 workerNode.info.ready = true
1481 }
aa9eede8 1482 this.workerNodes.push(workerNode)
aad6fb64 1483 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1484 if (workerNodeKey === -1) {
86ed0598 1485 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1486 }
1487 return workerNodeKey
ea7a90d3 1488 }
c923ce56 1489
51fe3d3c 1490 /**
f06e48d8 1491 * Removes the given worker from the pool worker nodes.
51fe3d3c 1492 *
f06e48d8 1493 * @param worker - The worker.
51fe3d3c 1494 */
416fd65c 1495 private removeWorkerNode (worker: Worker): void {
aad6fb64 1496 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1497 if (workerNodeKey !== -1) {
1498 this.workerNodes.splice(workerNodeKey, 1)
1499 this.workerChoiceStrategyContext.remove(workerNodeKey)
1500 }
51fe3d3c 1501 }
adc3c320 1502
e2b31e32
JB
1503 /** @inheritDoc */
1504 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1505 return (
e2b31e32
JB
1506 this.opts.enableTasksQueue === true &&
1507 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1508 )
1509 }
1510
1511 private hasBackPressure (): boolean {
1512 return (
1513 this.opts.enableTasksQueue === true &&
1514 this.workerNodes.findIndex(
041dc05b 1515 workerNode => !workerNode.hasBackPressure()
a1763c54 1516 ) === -1
9e844245 1517 )
e2b31e32
JB
1518 }
1519
b0a4db63 1520 /**
aa9eede8 1521 * Executes the given task on the worker given its worker node key.
b0a4db63 1522 *
aa9eede8 1523 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1524 * @param task - The task to execute.
1525 */
2e81254d 1526 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1527 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1528 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1529 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1530 }
1531
f9f00b5f 1532 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1533 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1534 this.checkAndEmitTaskQueuingEvents()
1535 return tasksQueueSize
adc3c320
JB
1536 }
1537
416fd65c 1538 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1539 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1540 }
1541
416fd65c 1542 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1543 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1544 }
1545
81c02522 1546 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1547 while (this.tasksQueueSize(workerNodeKey) > 0) {
1548 this.executeTask(
1549 workerNodeKey,
1550 this.dequeueTask(workerNodeKey) as Task<Data>
1551 )
ff733df7 1552 }
4b628b48 1553 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1554 }
1555
ef41a6e6
JB
1556 private flushTasksQueues (): void {
1557 for (const [workerNodeKey] of this.workerNodes.entries()) {
1558 this.flushTasksQueue(workerNodeKey)
1559 }
1560 }
c97c7edb 1561}