docs: refine API documentation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
f18fd12b 5import { AsyncResource } from 'node:async_hooks'
5c4d16da
JB
6import type {
7 MessageValue,
8 PromiseResponseWrapper,
76d91ea0 9 Task
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16 13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
bbeadd16 23} from '../utils'
59317253 24import { KillBehaviors } from '../worker/worker-options'
6703b9f4 25import type { TaskFunction } from '../worker/task-functions'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
f1f77f45 38 TaskStatistics,
4d159167 39 WorkerInfo,
9f95d5eb 40 WorkerNodeEventDetail,
4d159167
JB
41 WorkerType,
42 WorkerUsage
e102732c 43} from './worker'
a35560ba 44import {
f0d7f803 45 Measurements,
a35560ba 46 WorkerChoiceStrategies,
a20f0ba5
JB
47 type WorkerChoiceStrategy,
48 type WorkerChoiceStrategyOptions
bdaf31cd
JB
49} from './selection-strategies/selection-strategies-types'
50import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 51import { version } from './version'
4b628b48 52import { WorkerNode } from './worker-node'
bde6b5d7
JB
53import {
54 checkFilePath,
55 checkValidTasksQueueOptions,
bfc75cca 56 checkValidWorkerChoiceStrategy,
32b141fd 57 getDefaultTasksQueueOptions,
c329fd41
JB
58 updateEluWorkerUsage,
59 updateRunTimeWorkerUsage,
60 updateTaskStatisticsWorkerUsage,
61 updateWaitTimeWorkerUsage,
87347ea8 62 waitWorkerNodeEvents
bde6b5d7 63} from './utils'
23ccf9d7 64
729c563d 65/**
ea7a90d3 66 * Base class that implements some shared logic for all poolifier pools.
729c563d 67 *
38e795c1 68 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 71 */
c97c7edb 72export abstract class AbstractPool<
f06e48d8 73 Worker extends IWorker,
d3c8a1a8
S
74 Data = unknown,
75 Response = unknown
c4855468 76> implements IPool<Worker, Data, Response> {
afc003b2 77 /** @inheritDoc */
4b628b48 78 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 79
afc003b2 80 /** @inheritDoc */
f80125ca 81 public emitter?: EventEmitterAsyncResource
7c0ba920 82
be0676b3 83 /**
419e8121 84 * The task execution response promise map:
2740a743 85 * - `key`: The message id of each submitted task.
a3445496 86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 87 *
a3445496 88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 89 */
501aea93
JB
90 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
91 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 92
a35560ba 93 /**
51fe3d3c 94 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
95 */
96 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
97 Worker,
98 Data,
99 Response
a35560ba
S
100 >
101
72ae84a2
JB
102 /**
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
106 */
107 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
108
15b176e0
JB
109 /**
110 * Whether the pool is started or not.
111 */
112 private started: boolean
47352846
JB
113 /**
114 * Whether the pool is starting or not.
115 */
116 private starting: boolean
711623b8
JB
117 /**
118 * Whether the pool is destroying or not.
119 */
120 private destroying: boolean
55082af9
JB
121 /**
122 * Whether the pool ready event has been emitted or not.
123 */
124 private readyEventEmitted: boolean
afe0d5bf
JB
125 /**
126 * The start timestamp of the pool.
127 */
128 private readonly startTimestamp
129
729c563d
S
130 /**
131 * Constructs a new poolifier pool.
132 *
00e1bdeb 133 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 134 * @param filePath - Path to the worker file.
38e795c1 135 * @param opts - Options for the pool.
00e1bdeb 136 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 137 */
c97c7edb 138 public constructor (
26ce26ca 139 protected readonly minimumNumberOfWorkers: number,
b4213b7f 140 protected readonly filePath: string,
26ce26ca
JB
141 protected readonly opts: PoolOptions<Worker>,
142 protected readonly maximumNumberOfWorkers?: number
c97c7edb 143 ) {
78cea37e 144 if (!this.isMain()) {
04f45163 145 throw new Error(
8c6d4acf 146 'Cannot start a pool from a worker with the same type as the pool'
04f45163 147 )
c97c7edb 148 }
26ce26ca 149 this.checkPoolType()
bde6b5d7 150 checkFilePath(this.filePath)
26ce26ca 151 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 152 this.checkPoolOptions(this.opts)
1086026a 153
7254e419
JB
154 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
155 this.executeTask = this.executeTask.bind(this)
156 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 157
6bd72cd0 158 if (this.opts.enableEvents === true) {
b5604034 159 this.initializeEventEmitter()
7c0ba920 160 }
d59df138
JB
161 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
162 Worker,
163 Data,
164 Response
da309861
JB
165 >(
166 this,
167 this.opts.workerChoiceStrategy,
168 this.opts.workerChoiceStrategyOptions
169 )
b6b32453
JB
170
171 this.setupHook()
172
72ae84a2
JB
173 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
174
47352846 175 this.started = false
075e51d1 176 this.starting = false
711623b8 177 this.destroying = false
55082af9 178 this.readyEventEmitted = false
47352846
JB
179 if (this.opts.startWorkers === true) {
180 this.start()
181 }
afe0d5bf
JB
182
183 this.startTimestamp = performance.now()
c97c7edb
S
184 }
185
26ce26ca
JB
186 private checkPoolType (): void {
187 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
188 throw new Error(
189 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
190 )
191 }
192 }
193
af7f2788
JB
194 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
195 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
af7f2788 199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 200 throw new TypeError(
0d80593b 201 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 202 )
af7f2788 203 } else if (minimumNumberOfWorkers < 0) {
473c717a 204 throw new RangeError(
8d3782fa
JB
205 'Cannot instantiate a pool with a negative number of workers'
206 )
af7f2788 207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 213 if (isPlainObject(opts)) {
47352846 214 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 215 checkValidWorkerChoiceStrategy(
2324f8c9
JB
216 opts.workerChoiceStrategy as WorkerChoiceStrategy
217 )
0d80593b
JB
218 this.opts.workerChoiceStrategy =
219 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
220 this.checkValidWorkerChoiceStrategyOptions(
221 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
222 )
26ce26ca
JB
223 if (opts.workerChoiceStrategyOptions != null) {
224 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 225 }
1f68cede 226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
bde6b5d7 230 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
0d80593b
JB
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
242 ): void {
2324f8c9
JB
243 if (
244 workerChoiceStrategyOptions != null &&
245 !isPlainObject(workerChoiceStrategyOptions)
246 ) {
0d80593b
JB
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
49be33fe 251 if (
2324f8c9 252 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
253 Object.keys(workerChoiceStrategyOptions.weights).length !==
254 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
255 ) {
256 throw new Error(
257 'Invalid worker choice strategy options: must have a weight for each worker node'
258 )
259 }
f0d7f803 260 if (
2324f8c9 261 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
262 !Object.values(Measurements).includes(
263 workerChoiceStrategyOptions.measurement
264 )
265 ) {
266 throw new Error(
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
268 )
269 }
0d80593b
JB
270 }
271
b5604034 272 private initializeEventEmitter (): void {
5b7d00b4 273 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
274 name: `poolifier:${this.type}-${this.worker}-pool`
275 })
276 }
277
08f3f44c 278 /** @inheritDoc */
6b27d407
JB
279 public get info (): PoolInfo {
280 return {
23ccf9d7 281 version,
6b27d407 282 type: this.type,
184855e6 283 worker: this.worker,
47352846 284 started: this.started,
2431bdb4
JB
285 ready: this.ready,
286 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
26ce26ca
JB
287 minSize: this.minimumNumberOfWorkers,
288 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
289 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
c05f0d50 290 .runTime.aggregate &&
26ce26ca 291 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1305e9a8 292 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
293 workerNodes: this.workerNodes.length,
294 idleWorkerNodes: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
f59e1027 296 workerNode.usage.tasks.executing === 0
a4e07f72
JB
297 ? accumulator + 1
298 : accumulator,
6b27d407
JB
299 0
300 ),
301 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
302 (accumulator, _workerNode, workerNodeKey) =>
303 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
304 0
305 ),
a4e07f72 306 executedTasks: this.workerNodes.reduce(
6b27d407 307 (accumulator, workerNode) =>
f59e1027 308 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
309 0
310 ),
311 executingTasks: this.workerNodes.reduce(
312 (accumulator, workerNode) =>
f59e1027 313 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
314 0
315 ),
daf86646
JB
316 ...(this.opts.enableTasksQueue === true && {
317 queuedTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.tasks.queued,
320 0
321 )
322 }),
323 ...(this.opts.enableTasksQueue === true && {
324 maxQueuedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
327 0
328 )
329 }),
a1763c54
JB
330 ...(this.opts.enableTasksQueue === true && {
331 backPressure: this.hasBackPressure()
332 }),
68cbdc84
JB
333 ...(this.opts.enableTasksQueue === true && {
334 stolenTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.stolen,
337 0
338 )
339 }),
a4e07f72
JB
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
f59e1027 342 accumulator + workerNode.usage.tasks.failed,
a4e07f72 343 0
1dcf8b7b 344 ),
26ce26ca 345 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1dcf8b7b
JB
346 .runTime.aggregate && {
347 runTime: {
98e72cda 348 minimum: round(
90d6701c 349 min(
98e72cda 350 ...this.workerNodes.map(
041dc05b 351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 352 )
1dcf8b7b
JB
353 )
354 ),
98e72cda 355 maximum: round(
90d6701c 356 max(
98e72cda 357 ...this.workerNodes.map(
041dc05b 358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 359 )
1dcf8b7b 360 )
98e72cda 361 ),
26ce26ca 362 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
3baa0837
JB
363 .runTime.average && {
364 average: round(
365 average(
366 this.workerNodes.reduce<number[]>(
367 (accumulator, workerNode) =>
368 accumulator.concat(workerNode.usage.runTime.history),
369 []
370 )
98e72cda 371 )
dc021bcc 372 )
3baa0837 373 }),
26ce26ca 374 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
98e72cda
JB
375 .runTime.median && {
376 median: round(
377 median(
3baa0837
JB
378 this.workerNodes.reduce<number[]>(
379 (accumulator, workerNode) =>
380 accumulator.concat(workerNode.usage.runTime.history),
381 []
98e72cda
JB
382 )
383 )
384 )
385 })
1dcf8b7b
JB
386 }
387 }),
26ce26ca 388 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1dcf8b7b
JB
389 .waitTime.aggregate && {
390 waitTime: {
98e72cda 391 minimum: round(
90d6701c 392 min(
98e72cda 393 ...this.workerNodes.map(
041dc05b 394 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 395 )
1dcf8b7b
JB
396 )
397 ),
98e72cda 398 maximum: round(
90d6701c 399 max(
98e72cda 400 ...this.workerNodes.map(
041dc05b 401 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 402 )
1dcf8b7b 403 )
98e72cda 404 ),
26ce26ca 405 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
3baa0837
JB
406 .waitTime.average && {
407 average: round(
408 average(
409 this.workerNodes.reduce<number[]>(
410 (accumulator, workerNode) =>
411 accumulator.concat(workerNode.usage.waitTime.history),
412 []
413 )
98e72cda 414 )
dc021bcc 415 )
3baa0837 416 }),
26ce26ca 417 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
98e72cda
JB
418 .waitTime.median && {
419 median: round(
420 median(
3baa0837
JB
421 this.workerNodes.reduce<number[]>(
422 (accumulator, workerNode) =>
423 accumulator.concat(workerNode.usage.waitTime.history),
424 []
98e72cda
JB
425 )
426 )
427 )
428 })
1dcf8b7b
JB
429 }
430 })
6b27d407
JB
431 }
432 }
08f3f44c 433
aa9eede8
JB
434 /**
435 * The pool readiness boolean status.
436 */
2431bdb4
JB
437 private get ready (): boolean {
438 return (
b97d82d8
JB
439 this.workerNodes.reduce(
440 (accumulator, workerNode) =>
441 !workerNode.info.dynamic && workerNode.info.ready
442 ? accumulator + 1
443 : accumulator,
444 0
26ce26ca 445 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
446 )
447 }
448
afe0d5bf 449 /**
aa9eede8 450 * The approximate pool utilization.
afe0d5bf
JB
451 *
452 * @returns The pool utilization.
453 */
454 private get utilization (): number {
8e5ca040 455 const poolTimeCapacity =
26ce26ca
JB
456 (performance.now() - this.startTimestamp) *
457 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
458 const totalTasksRunTime = this.workerNodes.reduce(
459 (accumulator, workerNode) =>
71514351 460 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
461 0
462 )
463 const totalTasksWaitTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
71514351 465 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
466 0
467 )
8e5ca040 468 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
469 }
470
8881ae32 471 /**
aa9eede8 472 * The pool type.
8881ae32
JB
473 *
474 * If it is `'dynamic'`, it provides the `max` property.
475 */
476 protected abstract get type (): PoolType
477
184855e6 478 /**
aa9eede8 479 * The worker type.
184855e6
JB
480 */
481 protected abstract get worker (): WorkerType
482
6b813701
JB
483 /**
484 * Checks if the worker id sent in the received message from a worker is valid.
485 *
486 * @param message - The received message.
487 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
488 */
4d159167 489 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
490 if (message.workerId == null) {
491 throw new Error('Worker message received without worker id')
8e5a7cf9 492 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
493 throw new Error(
494 `Worker message received from unknown worker '${message.workerId}'`
495 )
496 }
497 }
498
aa9eede8
JB
499 /**
500 * Gets the worker node key given its worker id.
501 *
502 * @param workerId - The worker id.
aad6fb64 503 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 504 */
72ae84a2 505 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 506 return this.workerNodes.findIndex(
041dc05b 507 workerNode => workerNode.info.id === workerId
aad6fb64 508 )
aa9eede8
JB
509 }
510
afc003b2 511 /** @inheritDoc */
a35560ba 512 public setWorkerChoiceStrategy (
59219cbb
JB
513 workerChoiceStrategy: WorkerChoiceStrategy,
514 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 515 ): void {
bde6b5d7 516 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 517 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
518 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
519 this.opts.workerChoiceStrategy
520 )
521 if (workerChoiceStrategyOptions != null) {
522 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
523 }
aa9eede8 524 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 525 workerNode.resetUsage()
9edb9717 526 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 527 }
a20f0ba5
JB
528 }
529
530 /** @inheritDoc */
531 public setWorkerChoiceStrategyOptions (
532 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
533 ): void {
0d80593b 534 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
535 if (workerChoiceStrategyOptions != null) {
536 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
8990357d 537 }
a20f0ba5 538 this.workerChoiceStrategyContext.setOptions(
26ce26ca 539 this,
a20f0ba5 540 this.opts.workerChoiceStrategyOptions
a35560ba
S
541 )
542 }
543
a20f0ba5 544 /** @inheritDoc */
8f52842f
JB
545 public enableTasksQueue (
546 enable: boolean,
547 tasksQueueOptions?: TasksQueueOptions
548 ): void {
a20f0ba5 549 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
550 this.unsetTaskStealing()
551 this.unsetTasksStealingOnBackPressure()
ef41a6e6 552 this.flushTasksQueues()
a20f0ba5
JB
553 }
554 this.opts.enableTasksQueue = enable
8f52842f 555 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
556 }
557
558 /** @inheritDoc */
8f52842f 559 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 560 if (this.opts.enableTasksQueue === true) {
bde6b5d7 561 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
562 this.opts.tasksQueueOptions =
563 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 564 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416 565 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 566 this.unsetTaskStealing()
d6ca1416
JB
567 this.setTaskStealing()
568 } else {
569 this.unsetTaskStealing()
570 }
571 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 572 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
573 this.setTasksStealingOnBackPressure()
574 } else {
575 this.unsetTasksStealingOnBackPressure()
576 }
5baee0d7 577 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
578 delete this.opts.tasksQueueOptions
579 }
580 }
581
9b38ab2d
JB
582 private buildTasksQueueOptions (
583 tasksQueueOptions: TasksQueueOptions
584 ): TasksQueueOptions {
585 return {
26ce26ca
JB
586 ...getDefaultTasksQueueOptions(
587 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
588 ),
9b38ab2d
JB
589 ...tasksQueueOptions
590 }
591 }
592
5b49e864 593 private setTasksQueueSize (size: number): void {
20c6f652 594 for (const workerNode of this.workerNodes) {
ff3f866a 595 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
596 }
597 }
598
d6ca1416
JB
599 private setTaskStealing (): void {
600 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 601 this.workerNodes[workerNodeKey].on(
65542a35 602 'idleWorkerNode',
e1c2dba7 603 this.handleIdleWorkerNodeEvent
9f95d5eb 604 )
d6ca1416
JB
605 }
606 }
607
608 private unsetTaskStealing (): void {
609 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 610 this.workerNodes[workerNodeKey].off(
65542a35 611 'idleWorkerNode',
e1c2dba7 612 this.handleIdleWorkerNodeEvent
9f95d5eb 613 )
d6ca1416
JB
614 }
615 }
616
617 private setTasksStealingOnBackPressure (): void {
618 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 619 this.workerNodes[workerNodeKey].on(
b5e75be8 620 'backPressure',
e1c2dba7 621 this.handleBackPressureEvent
9f95d5eb 622 )
d6ca1416
JB
623 }
624 }
625
626 private unsetTasksStealingOnBackPressure (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 628 this.workerNodes[workerNodeKey].off(
b5e75be8 629 'backPressure',
e1c2dba7 630 this.handleBackPressureEvent
9f95d5eb 631 )
d6ca1416
JB
632 }
633 }
634
c319c66b
JB
635 /**
636 * Whether the pool is full or not.
637 *
638 * The pool filling boolean status.
639 */
dea903a8 640 protected get full (): boolean {
26ce26ca
JB
641 return (
642 this.workerNodes.length >=
643 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
644 )
dea903a8 645 }
c2ade475 646
c319c66b
JB
647 /**
648 * Whether the pool is busy or not.
649 *
650 * The pool busyness boolean status.
651 */
652 protected abstract get busy (): boolean
7c0ba920 653
6c6afb84 654 /**
3d76750a 655 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
656 *
657 * @returns Worker nodes busyness boolean status.
658 */
c2ade475 659 protected internalBusy (): boolean {
3d76750a
JB
660 if (this.opts.enableTasksQueue === true) {
661 return (
662 this.workerNodes.findIndex(
041dc05b 663 workerNode =>
3d76750a
JB
664 workerNode.info.ready &&
665 workerNode.usage.tasks.executing <
666 (this.opts.tasksQueueOptions?.concurrency as number)
667 ) === -1
668 )
3d76750a 669 }
419e8121
JB
670 return (
671 this.workerNodes.findIndex(
672 workerNode =>
673 workerNode.info.ready && workerNode.usage.tasks.executing === 0
674 ) === -1
675 )
cb70b19d
JB
676 }
677
42c677c1
JB
678 private isWorkerNodeBusy (workerNodeKey: number): boolean {
679 if (this.opts.enableTasksQueue === true) {
680 return (
681 this.workerNodes[workerNodeKey].usage.tasks.executing >=
682 (this.opts.tasksQueueOptions?.concurrency as number)
683 )
684 }
685 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
686 }
687
e81c38f2 688 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
689 workerNodeKey: number,
690 message: MessageValue<Data>
691 ): Promise<boolean> {
72ae84a2 692 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
693 const taskFunctionOperationListener = (
694 message: MessageValue<Response>
695 ): void => {
696 this.checkMessageWorkerId(message)
697 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 698 if (
ae036c3e
JB
699 message.taskFunctionOperationStatus != null &&
700 message.workerId === workerId
72ae84a2 701 ) {
ae036c3e
JB
702 if (message.taskFunctionOperationStatus) {
703 resolve(true)
704 } else if (!message.taskFunctionOperationStatus) {
705 reject(
706 new Error(
707 `Task function operation '${
708 message.taskFunctionOperation as string
709 }' failed on worker ${message.workerId} with error: '${
710 message.workerError?.message as string
711 }'`
712 )
72ae84a2 713 )
ae036c3e
JB
714 }
715 this.deregisterWorkerMessageListener(
716 this.getWorkerNodeKeyByWorkerId(message.workerId),
717 taskFunctionOperationListener
72ae84a2
JB
718 )
719 }
ae036c3e
JB
720 }
721 this.registerWorkerMessageListener(
722 workerNodeKey,
723 taskFunctionOperationListener
724 )
72ae84a2
JB
725 this.sendToWorker(workerNodeKey, message)
726 })
727 }
728
729 private async sendTaskFunctionOperationToWorkers (
adee6053 730 message: MessageValue<Data>
e81c38f2
JB
731 ): Promise<boolean> {
732 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
733 const responsesReceived = new Array<MessageValue<Response>>()
734 const taskFunctionOperationsListener = (
735 message: MessageValue<Response>
736 ): void => {
737 this.checkMessageWorkerId(message)
738 if (message.taskFunctionOperationStatus != null) {
739 responsesReceived.push(message)
740 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 741 if (
e81c38f2
JB
742 responsesReceived.every(
743 message => message.taskFunctionOperationStatus === true
744 )
745 ) {
746 resolve(true)
747 } else if (
e81c38f2
JB
748 responsesReceived.some(
749 message => message.taskFunctionOperationStatus === false
750 )
751 ) {
b0b55f57
JB
752 const errorResponse = responsesReceived.find(
753 response => response.taskFunctionOperationStatus === false
754 )
e81c38f2
JB
755 reject(
756 new Error(
b0b55f57 757 `Task function operation '${
e81c38f2 758 message.taskFunctionOperation as string
b0b55f57
JB
759 }' failed on worker ${
760 errorResponse?.workerId as number
761 } with error: '${
762 errorResponse?.workerError?.message as string
763 }'`
e81c38f2
JB
764 )
765 )
766 }
ae036c3e
JB
767 this.deregisterWorkerMessageListener(
768 this.getWorkerNodeKeyByWorkerId(message.workerId),
769 taskFunctionOperationsListener
770 )
e81c38f2 771 }
ae036c3e
JB
772 }
773 }
774 for (const [workerNodeKey] of this.workerNodes.entries()) {
775 this.registerWorkerMessageListener(
776 workerNodeKey,
777 taskFunctionOperationsListener
778 )
72ae84a2 779 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
780 }
781 })
6703b9f4
JB
782 }
783
784 /** @inheritDoc */
785 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
786 for (const workerNode of this.workerNodes) {
787 if (
788 Array.isArray(workerNode.info.taskFunctionNames) &&
789 workerNode.info.taskFunctionNames.includes(name)
790 ) {
791 return true
792 }
793 }
794 return false
6703b9f4
JB
795 }
796
797 /** @inheritDoc */
e81c38f2
JB
798 public async addTaskFunction (
799 name: string,
3feeab69 800 fn: TaskFunction<Data, Response>
e81c38f2 801 ): Promise<boolean> {
3feeab69
JB
802 if (typeof name !== 'string') {
803 throw new TypeError('name argument must be a string')
804 }
805 if (typeof name === 'string' && name.trim().length === 0) {
806 throw new TypeError('name argument must not be an empty string')
807 }
808 if (typeof fn !== 'function') {
809 throw new TypeError('fn argument must be a function')
810 }
adee6053 811 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
812 taskFunctionOperation: 'add',
813 taskFunctionName: name,
3feeab69 814 taskFunction: fn.toString()
6703b9f4 815 })
adee6053
JB
816 this.taskFunctions.set(name, fn)
817 return opResult
6703b9f4
JB
818 }
819
820 /** @inheritDoc */
e81c38f2 821 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
822 if (!this.taskFunctions.has(name)) {
823 throw new Error(
16248b23 824 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
825 )
826 }
adee6053 827 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
828 taskFunctionOperation: 'remove',
829 taskFunctionName: name
830 })
adee6053
JB
831 this.deleteTaskFunctionWorkerUsages(name)
832 this.taskFunctions.delete(name)
833 return opResult
6703b9f4
JB
834 }
835
90d7d101 836 /** @inheritDoc */
6703b9f4 837 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
838 for (const workerNode of this.workerNodes) {
839 if (
6703b9f4
JB
840 Array.isArray(workerNode.info.taskFunctionNames) &&
841 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 842 ) {
6703b9f4 843 return workerNode.info.taskFunctionNames
f2dbbf95 844 }
90d7d101 845 }
f2dbbf95 846 return []
90d7d101
JB
847 }
848
6703b9f4 849 /** @inheritDoc */
e81c38f2 850 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 851 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
852 taskFunctionOperation: 'default',
853 taskFunctionName: name
854 })
6703b9f4
JB
855 }
856
adee6053
JB
857 private deleteTaskFunctionWorkerUsages (name: string): void {
858 for (const workerNode of this.workerNodes) {
859 workerNode.deleteTaskFunctionWorkerUsage(name)
860 }
861 }
862
375f7504
JB
863 private shallExecuteTask (workerNodeKey: number): boolean {
864 return (
865 this.tasksQueueSize(workerNodeKey) === 0 &&
866 this.workerNodes[workerNodeKey].usage.tasks.executing <
867 (this.opts.tasksQueueOptions?.concurrency as number)
868 )
869 }
870
afc003b2 871 /** @inheritDoc */
7d91a8cd
JB
872 public async execute (
873 data?: Data,
874 name?: string,
875 transferList?: TransferListItem[]
876 ): Promise<Response> {
52b71763 877 return await new Promise<Response>((resolve, reject) => {
15b176e0 878 if (!this.started) {
47352846 879 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 880 return
15b176e0 881 }
711623b8
JB
882 if (this.destroying) {
883 reject(new Error('Cannot execute a task on destroying pool'))
884 return
885 }
7d91a8cd
JB
886 if (name != null && typeof name !== 'string') {
887 reject(new TypeError('name argument must be a string'))
9d2d0da1 888 return
7d91a8cd 889 }
90d7d101
JB
890 if (
891 name != null &&
892 typeof name === 'string' &&
893 name.trim().length === 0
894 ) {
f58b60b9 895 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 896 return
90d7d101 897 }
b558f6b5
JB
898 if (transferList != null && !Array.isArray(transferList)) {
899 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 900 return
b558f6b5
JB
901 }
902 const timestamp = performance.now()
903 const workerNodeKey = this.chooseWorkerNode()
501aea93 904 const task: Task<Data> = {
52b71763
JB
905 name: name ?? DEFAULT_TASK_NAME,
906 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
907 data: data ?? ({} as Data),
7d91a8cd 908 transferList,
52b71763 909 timestamp,
7629bdf1 910 taskId: randomUUID()
52b71763 911 }
7629bdf1 912 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
913 resolve,
914 reject,
f18fd12b
JB
915 workerNodeKey,
916 ...(this.emitter != null && {
917 asyncResource: new AsyncResource('poolifier:task', {
918 triggerAsyncId: this.emitter.asyncId,
919 requireManualDestroy: true
920 })
921 })
2e81254d 922 })
52b71763 923 if (
4e377863
JB
924 this.opts.enableTasksQueue === false ||
925 (this.opts.enableTasksQueue === true &&
375f7504 926 this.shallExecuteTask(workerNodeKey))
52b71763 927 ) {
501aea93 928 this.executeTask(workerNodeKey, task)
4e377863
JB
929 } else {
930 this.enqueueTask(workerNodeKey, task)
52b71763 931 }
2e81254d 932 })
280c2a77 933 }
c97c7edb 934
47352846
JB
935 /** @inheritdoc */
936 public start (): void {
711623b8
JB
937 if (this.started) {
938 throw new Error('Cannot start an already started pool')
939 }
940 if (this.starting) {
941 throw new Error('Cannot start an already starting pool')
942 }
943 if (this.destroying) {
944 throw new Error('Cannot start a destroying pool')
945 }
47352846
JB
946 this.starting = true
947 while (
948 this.workerNodes.reduce(
949 (accumulator, workerNode) =>
950 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
951 0
26ce26ca 952 ) < this.minimumNumberOfWorkers
47352846
JB
953 ) {
954 this.createAndSetupWorkerNode()
955 }
956 this.starting = false
957 this.started = true
958 }
959
afc003b2 960 /** @inheritDoc */
c97c7edb 961 public async destroy (): Promise<void> {
711623b8
JB
962 if (!this.started) {
963 throw new Error('Cannot destroy an already destroyed pool')
964 }
965 if (this.starting) {
966 throw new Error('Cannot destroy an starting pool')
967 }
968 if (this.destroying) {
969 throw new Error('Cannot destroy an already destroying pool')
970 }
971 this.destroying = true
1fbcaa7c 972 await Promise.all(
14c39df1 973 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 974 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
975 })
976 )
33e6bb4c 977 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 978 this.emitter?.emitDestroy()
78f60f82 979 this.emitter?.removeAllListeners()
55082af9 980 this.readyEventEmitted = false
711623b8 981 this.destroying = false
15b176e0 982 this.started = false
c97c7edb
S
983 }
984
26ce26ca 985 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 986 await new Promise<void>((resolve, reject) => {
49beedcb 987 if (this.workerNodes?.[workerNodeKey] == null) {
ad3836ed 988 resolve()
85b2561d
JB
989 return
990 }
ae036c3e
JB
991 const killMessageListener = (message: MessageValue<Response>): void => {
992 this.checkMessageWorkerId(message)
1e3214b6
JB
993 if (message.kill === 'success') {
994 resolve()
995 } else if (message.kill === 'failure') {
72ae84a2
JB
996 reject(
997 new Error(
ae036c3e 998 `Kill message handling failed on worker ${
72ae84a2 999 message.workerId as number
ae036c3e 1000 }`
72ae84a2
JB
1001 )
1002 )
1e3214b6 1003 }
ae036c3e 1004 }
51d9cfbd 1005 // FIXME: should be registered only once
ae036c3e 1006 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1007 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1008 })
1e3214b6
JB
1009 }
1010
4a6952ff 1011 /**
aa9eede8 1012 * Terminates the worker node given its worker node key.
4a6952ff 1013 *
aa9eede8 1014 * @param workerNodeKey - The worker node key.
4a6952ff 1015 */
07e0c9e5
JB
1016 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1017 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1018 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1019 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1020 await waitWorkerNodeEvents(
1021 workerNode,
1022 'taskFinished',
1023 flushedTasks,
1024 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1025 getDefaultTasksQueueOptions(
1026 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1027 ).tasksFinishedTimeout
32b141fd 1028 )
07e0c9e5
JB
1029 await this.sendKillMessageToWorker(workerNodeKey)
1030 await workerNode.terminate()
1031 }
c97c7edb 1032
729c563d 1033 /**
6677a3d3
JB
1034 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1035 * Can be overridden.
afc003b2
JB
1036 *
1037 * @virtual
729c563d 1038 */
280c2a77 1039 protected setupHook (): void {
965df41c 1040 /* Intentionally empty */
280c2a77 1041 }
c97c7edb 1042
729c563d 1043 /**
280c2a77
S
1044 * Should return whether the worker is the main worker or not.
1045 */
1046 protected abstract isMain (): boolean
1047
1048 /**
2e81254d 1049 * Hook executed before the worker task execution.
bf9549ae 1050 * Can be overridden.
729c563d 1051 *
f06e48d8 1052 * @param workerNodeKey - The worker node key.
1c6fe997 1053 * @param task - The task to execute.
729c563d 1054 */
1c6fe997
JB
1055 protected beforeTaskExecutionHook (
1056 workerNodeKey: number,
1057 task: Task<Data>
1058 ): void {
94407def
JB
1059 if (this.workerNodes[workerNodeKey]?.usage != null) {
1060 const workerUsage = this.workerNodes[workerNodeKey].usage
1061 ++workerUsage.tasks.executing
c329fd41
JB
1062 updateWaitTimeWorkerUsage(
1063 this.workerChoiceStrategyContext,
1064 workerUsage,
1065 task
1066 )
94407def
JB
1067 }
1068 if (
1069 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1070 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1071 task.name as string
1072 ) != null
1073 ) {
db0e38ee 1074 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1075 workerNodeKey
db0e38ee 1076 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5 1077 ++taskFunctionWorkerUsage.tasks.executing
c329fd41
JB
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext,
1080 taskFunctionWorkerUsage,
1081 task
1082 )
b558f6b5 1083 }
c97c7edb
S
1084 }
1085
c01733f1 1086 /**
2e81254d 1087 * Hook executed after the worker task execution.
bf9549ae 1088 * Can be overridden.
c01733f1 1089 *
501aea93 1090 * @param workerNodeKey - The worker node key.
38e795c1 1091 * @param message - The received message.
c01733f1 1092 */
2e81254d 1093 protected afterTaskExecutionHook (
501aea93 1094 workerNodeKey: number,
2740a743 1095 message: MessageValue<Response>
bf9549ae 1096 ): void {
c329fd41 1097 let needWorkerChoiceStrategyUpdate = false
94407def
JB
1098 if (this.workerNodes[workerNodeKey]?.usage != null) {
1099 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1100 updateTaskStatisticsWorkerUsage(workerUsage, message)
1101 updateRunTimeWorkerUsage(
1102 this.workerChoiceStrategyContext,
1103 workerUsage,
1104 message
1105 )
1106 updateEluWorkerUsage(
1107 this.workerChoiceStrategyContext,
1108 workerUsage,
1109 message
1110 )
1111 needWorkerChoiceStrategyUpdate = true
94407def
JB
1112 }
1113 if (
1114 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1115 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1116 message.taskPerformance?.name as string
94407def
JB
1117 ) != null
1118 ) {
db0e38ee 1119 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1120 workerNodeKey
db0e38ee 1121 ].getTaskFunctionWorkerUsage(
0628755c 1122 message.taskPerformance?.name as string
b558f6b5 1123 ) as WorkerUsage
c329fd41
JB
1124 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1125 updateRunTimeWorkerUsage(
1126 this.workerChoiceStrategyContext,
1127 taskFunctionWorkerUsage,
1128 message
1129 )
1130 updateEluWorkerUsage(
1131 this.workerChoiceStrategyContext,
1132 taskFunctionWorkerUsage,
1133 message
1134 )
1135 needWorkerChoiceStrategyUpdate = true
1136 }
1137 if (needWorkerChoiceStrategyUpdate) {
1138 this.workerChoiceStrategyContext.update(workerNodeKey)
b558f6b5
JB
1139 }
1140 }
1141
db0e38ee
JB
1142 /**
1143 * Whether the worker node shall update its task function worker usage or not.
1144 *
1145 * @param workerNodeKey - The worker node key.
1146 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1147 */
1148 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1149 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1150 return (
94407def 1151 workerInfo != null &&
6703b9f4
JB
1152 Array.isArray(workerInfo.taskFunctionNames) &&
1153 workerInfo.taskFunctionNames.length > 2
b558f6b5 1154 )
f1c06930
JB
1155 }
1156
280c2a77 1157 /**
f06e48d8 1158 * Chooses a worker node for the next task.
280c2a77 1159 *
6c6afb84 1160 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1161 *
aa9eede8 1162 * @returns The chosen worker node key
280c2a77 1163 */
6c6afb84 1164 private chooseWorkerNode (): number {
930dcf12 1165 if (this.shallCreateDynamicWorker()) {
aa9eede8 1166 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1167 if (
b1aae695 1168 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1169 ) {
aa9eede8 1170 return workerNodeKey
6c6afb84 1171 }
17393ac8 1172 }
930dcf12
JB
1173 return this.workerChoiceStrategyContext.execute()
1174 }
1175
6c6afb84
JB
1176 /**
1177 * Conditions for dynamic worker creation.
1178 *
1179 * @returns Whether to create a dynamic worker or not.
1180 */
1181 private shallCreateDynamicWorker (): boolean {
930dcf12 1182 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1183 }
1184
280c2a77 1185 /**
aa9eede8 1186 * Sends a message to worker given its worker node key.
280c2a77 1187 *
aa9eede8 1188 * @param workerNodeKey - The worker node key.
38e795c1 1189 * @param message - The message.
7d91a8cd 1190 * @param transferList - The optional array of transferable objects.
280c2a77
S
1191 */
1192 protected abstract sendToWorker (
aa9eede8 1193 workerNodeKey: number,
7d91a8cd
JB
1194 message: MessageValue<Data>,
1195 transferList?: TransferListItem[]
280c2a77
S
1196 ): void
1197
4a6952ff 1198 /**
aa9eede8 1199 * Creates a new, completely set up worker node.
4a6952ff 1200 *
aa9eede8 1201 * @returns New, completely set up worker node key.
4a6952ff 1202 */
aa9eede8 1203 protected createAndSetupWorkerNode (): number {
c3719753
JB
1204 const workerNode = this.createWorkerNode()
1205 workerNode.registerWorkerEventHandler(
1206 'online',
1207 this.opts.onlineHandler ?? EMPTY_FUNCTION
1208 )
1209 workerNode.registerWorkerEventHandler(
1210 'message',
1211 this.opts.messageHandler ?? EMPTY_FUNCTION
1212 )
1213 workerNode.registerWorkerEventHandler(
1214 'error',
1215 this.opts.errorHandler ?? EMPTY_FUNCTION
1216 )
1217 workerNode.registerWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1218 workerNode.info.ready = false
2a69b8c5 1219 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1220 if (
b6bfca01 1221 this.started &&
9b38ab2d 1222 !this.starting &&
711623b8 1223 !this.destroying &&
9b38ab2d 1224 this.opts.restartWorkerOnError === true
15b176e0 1225 ) {
07e0c9e5 1226 if (workerNode.info.dynamic) {
aa9eede8 1227 this.createAndSetupDynamicWorkerNode()
8a1260a3 1228 } else {
aa9eede8 1229 this.createAndSetupWorkerNode()
8a1260a3 1230 }
5baee0d7 1231 }
6d59c3a3 1232 if (this.started && this.opts.enableTasksQueue === true) {
9974369e 1233 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1234 }
32b141fd 1235 workerNode?.terminate().catch(error => {
07e0c9e5
JB
1236 this.emitter?.emit(PoolEvents.error, error)
1237 })
5baee0d7 1238 })
c3719753
JB
1239 workerNode.registerWorkerEventHandler(
1240 'exit',
1241 this.opts.exitHandler ?? EMPTY_FUNCTION
1242 )
1243 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1244 this.removeWorkerNode(workerNode)
a974afa6 1245 })
c3719753 1246 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1247 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1248 return workerNodeKey
c97c7edb 1249 }
be0676b3 1250
930dcf12 1251 /**
aa9eede8 1252 * Creates a new, completely set up dynamic worker node.
930dcf12 1253 *
aa9eede8 1254 * @returns New, completely set up dynamic worker node key.
930dcf12 1255 */
aa9eede8
JB
1256 protected createAndSetupDynamicWorkerNode (): number {
1257 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1258 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1259 this.checkMessageWorkerId(message)
aa9eede8
JB
1260 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1261 message.workerId
aad6fb64 1262 )
aa9eede8 1263 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1264 // Kill message received from worker
930dcf12
JB
1265 if (
1266 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1267 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1268 ((this.opts.enableTasksQueue === false &&
aa9eede8 1269 workerUsage.tasks.executing === 0) ||
7b56f532 1270 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1271 workerUsage.tasks.executing === 0 &&
1272 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1273 ) {
f3827d5d 1274 // Flag the worker node as not ready immediately
ae3ab61d 1275 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1276 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1277 this.emitter?.emit(PoolEvents.error, error)
1278 })
930dcf12
JB
1279 }
1280 })
46b0bb09 1281 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1282 this.sendToWorker(workerNodeKey, {
e9dd5b66 1283 checkActive: true
21f710aa 1284 })
72ae84a2
JB
1285 if (this.taskFunctions.size > 0) {
1286 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1287 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1288 taskFunctionOperation: 'add',
1289 taskFunctionName,
1290 taskFunction: taskFunction.toString()
1291 }).catch(error => {
1292 this.emitter?.emit(PoolEvents.error, error)
1293 })
1294 }
1295 }
b5e113f6 1296 workerInfo.dynamic = true
b1aae695
JB
1297 if (
1298 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1299 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1300 ) {
b5e113f6
JB
1301 workerInfo.ready = true
1302 }
33e6bb4c 1303 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1304 return workerNodeKey
930dcf12
JB
1305 }
1306
a2ed5053 1307 /**
aa9eede8 1308 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1309 *
aa9eede8 1310 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1311 * @param listener - The message listener callback.
1312 */
85aeb3f3
JB
1313 protected abstract registerWorkerMessageListener<
1314 Message extends Data | Response
aa9eede8
JB
1315 >(
1316 workerNodeKey: number,
1317 listener: (message: MessageValue<Message>) => void
1318 ): void
a2ed5053 1319
ae036c3e
JB
1320 /**
1321 * Registers once a listener callback on the worker given its worker node key.
1322 *
1323 * @param workerNodeKey - The worker node key.
1324 * @param listener - The message listener callback.
1325 */
1326 protected abstract registerOnceWorkerMessageListener<
1327 Message extends Data | Response
1328 >(
1329 workerNodeKey: number,
1330 listener: (message: MessageValue<Message>) => void
1331 ): void
1332
1333 /**
1334 * Deregisters a listener callback on the worker given its worker node key.
1335 *
1336 * @param workerNodeKey - The worker node key.
1337 * @param listener - The message listener callback.
1338 */
1339 protected abstract deregisterWorkerMessageListener<
1340 Message extends Data | Response
1341 >(
1342 workerNodeKey: number,
1343 listener: (message: MessageValue<Message>) => void
1344 ): void
1345
a2ed5053 1346 /**
aa9eede8 1347 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1348 * Can be overridden.
1349 *
aa9eede8 1350 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1351 */
aa9eede8 1352 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1353 // Listen to worker messages.
fcd39179
JB
1354 this.registerWorkerMessageListener(
1355 workerNodeKey,
3a776b6d 1356 this.workerMessageListener
fcd39179 1357 )
85aeb3f3 1358 // Send the startup message to worker.
aa9eede8 1359 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1360 // Send the statistics message to worker.
1361 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1362 if (this.opts.enableTasksQueue === true) {
dbd73092 1363 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1364 this.workerNodes[workerNodeKey].on(
65542a35 1365 'idleWorkerNode',
e1c2dba7 1366 this.handleIdleWorkerNodeEvent
9f95d5eb 1367 )
47352846
JB
1368 }
1369 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1370 this.workerNodes[workerNodeKey].on(
b5e75be8 1371 'backPressure',
e1c2dba7 1372 this.handleBackPressureEvent
9f95d5eb 1373 )
47352846 1374 }
72695f86 1375 }
d2c73f82
JB
1376 }
1377
85aeb3f3 1378 /**
aa9eede8
JB
1379 * Sends the startup message to worker given its worker node key.
1380 *
1381 * @param workerNodeKey - The worker node key.
1382 */
1383 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1384
1385 /**
9edb9717 1386 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1387 *
aa9eede8 1388 * @param workerNodeKey - The worker node key.
85aeb3f3 1389 */
9edb9717 1390 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1391 this.sendToWorker(workerNodeKey, {
1392 statistics: {
1393 runTime:
1394 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1395 .runTime.aggregate,
1396 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1397 .elu.aggregate
72ae84a2 1398 }
aa9eede8
JB
1399 })
1400 }
a2ed5053 1401
42c677c1
JB
1402 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1403 if (this.shallExecuteTask(workerNodeKey)) {
1404 this.executeTask(workerNodeKey, task)
1405 } else {
1406 this.enqueueTask(workerNodeKey, task)
1407 }
1408 }
1409
a2ed5053 1410 private redistributeQueuedTasks (workerNodeKey: number): void {
32b141fd
JB
1411 if (workerNodeKey === -1) {
1412 return
1413 }
cb71d660
JB
1414 if (this.workerNodes.length <= 1) {
1415 return
1416 }
a2ed5053 1417 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1418 const destinationWorkerNodeKey = this.workerNodes.reduce(
1419 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1420 return workerNode.info.ready &&
1421 workerNode.usage.tasks.queued <
1422 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1423 ? workerNodeKey
1424 : minWorkerNodeKey
1425 },
1426 0
1427 )
42c677c1
JB
1428 this.handleTask(
1429 destinationWorkerNodeKey,
1430 this.dequeueTask(workerNodeKey) as Task<Data>
1431 )
dd951876
JB
1432 }
1433 }
1434
b1838604
JB
1435 private updateTaskStolenStatisticsWorkerUsage (
1436 workerNodeKey: number,
b1838604
JB
1437 taskName: string
1438 ): void {
1a880eca 1439 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1440 if (workerNode?.usage != null) {
1441 ++workerNode.usage.tasks.stolen
1442 }
1443 if (
1444 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1445 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1446 ) {
1447 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1448 taskName
1449 ) as WorkerUsage
1450 ++taskFunctionWorkerUsage.tasks.stolen
1451 }
1452 }
1453
463226a4 1454 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1455 workerNodeKey: number
463226a4
JB
1456 ): void {
1457 const workerNode = this.workerNodes[workerNodeKey]
1458 if (workerNode?.usage != null) {
1459 ++workerNode.usage.tasks.sequentiallyStolen
1460 }
f1f77f45
JB
1461 }
1462
1463 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1464 workerNodeKey: number,
1465 taskName: string
1466 ): void {
1467 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1468 if (
1469 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1470 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1471 ) {
1472 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1473 taskName
1474 ) as WorkerUsage
1475 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1476 }
1477 }
1478
1479 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1480 workerNodeKey: number
463226a4
JB
1481 ): void {
1482 const workerNode = this.workerNodes[workerNodeKey]
1483 if (workerNode?.usage != null) {
1484 workerNode.usage.tasks.sequentiallyStolen = 0
1485 }
f1f77f45
JB
1486 }
1487
1488 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1489 workerNodeKey: number,
1490 taskName: string
1491 ): void {
1492 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1493 if (
1494 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1495 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1496 ) {
1497 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1498 taskName
1499 ) as WorkerUsage
1500 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1501 }
1502 }
1503
65542a35 1504 private readonly handleIdleWorkerNodeEvent = (
e1c2dba7 1505 eventDetail: WorkerNodeEventDetail,
463226a4 1506 previousStolenTask?: Task<Data>
9f95d5eb 1507 ): void => {
cb71d660
JB
1508 if (this.workerNodes.length <= 1) {
1509 return
1510 }
e1c2dba7 1511 const { workerNodeKey } = eventDetail
463226a4
JB
1512 if (workerNodeKey == null) {
1513 throw new Error(
1514 'WorkerNode event detail workerNodeKey attribute must be defined'
1515 )
1516 }
1517 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1518 if (
1519 previousStolenTask != null &&
1520 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1521 (workerNodeTasksUsage.executing > 0 ||
1522 this.tasksQueueSize(workerNodeKey) > 0)
1523 ) {
f1f77f45
JB
1524 for (const taskName of this.workerNodes[workerNodeKey].info
1525 .taskFunctionNames as string[]) {
1526 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1527 workerNodeKey,
1528 taskName
1529 )
1530 }
1531 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1532 return
1533 }
1534 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1535 if (
1536 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1537 stolenTask != null
1538 ) {
1539 const taskFunctionTasksWorkerUsage = this.workerNodes[
1540 workerNodeKey
1541 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1542 ?.tasks as TaskStatistics
1543 if (
1544 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1545 (previousStolenTask != null &&
1546 previousStolenTask.name === stolenTask.name &&
1547 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1548 ) {
1549 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1550 workerNodeKey,
1551 stolenTask.name as string
1552 )
1553 } else {
1554 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1555 workerNodeKey,
1556 stolenTask.name as string
1557 )
1558 }
1559 }
463226a4
JB
1560 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1561 .then(() => {
e1c2dba7 1562 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
463226a4
JB
1563 return undefined
1564 })
1565 .catch(EMPTY_FUNCTION)
1566 }
1567
1568 private readonly workerNodeStealTask = (
1569 workerNodeKey: number
1570 ): Task<Data> | undefined => {
dd951876 1571 const workerNodes = this.workerNodes
a6b3272b 1572 .slice()
dd951876
JB
1573 .sort(
1574 (workerNodeA, workerNodeB) =>
1575 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1576 )
f201a0cd 1577 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1578 (sourceWorkerNode, sourceWorkerNodeKey) =>
1579 sourceWorkerNode.info.ready &&
1580 sourceWorkerNodeKey !== workerNodeKey &&
1581 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1582 )
1583 if (sourceWorkerNode != null) {
72ae84a2 1584 const task = sourceWorkerNode.popTask() as Task<Data>
42c677c1 1585 this.handleTask(workerNodeKey, task)
f1f77f45 1586 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
f201a0cd 1587 this.updateTaskStolenStatisticsWorkerUsage(
463226a4 1588 workerNodeKey,
f201a0cd
JB
1589 task.name as string
1590 )
463226a4 1591 return task
72695f86
JB
1592 }
1593 }
1594
9f95d5eb 1595 private readonly handleBackPressureEvent = (
e1c2dba7 1596 eventDetail: WorkerNodeEventDetail
9f95d5eb 1597 ): void => {
cb71d660
JB
1598 if (this.workerNodes.length <= 1) {
1599 return
1600 }
e1c2dba7 1601 const { workerId } = eventDetail
f778c355
JB
1602 const sizeOffset = 1
1603 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1604 return
1605 }
72695f86 1606 const sourceWorkerNode =
b5e75be8 1607 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1608 const workerNodes = this.workerNodes
a6b3272b 1609 .slice()
72695f86
JB
1610 .sort(
1611 (workerNodeA, workerNodeB) =>
1612 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1613 )
1614 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1615 if (
0bc68267 1616 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1617 workerNode.info.ready &&
b5e75be8 1618 workerNode.info.id !== workerId &&
0bc68267 1619 workerNode.usage.tasks.queued <
f778c355 1620 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1621 ) {
72ae84a2 1622 const task = sourceWorkerNode.popTask() as Task<Data>
42c677c1 1623 this.handleTask(workerNodeKey, task)
b1838604
JB
1624 this.updateTaskStolenStatisticsWorkerUsage(
1625 workerNodeKey,
b1838604
JB
1626 task.name as string
1627 )
10ecf8fd 1628 }
a2ed5053
JB
1629 }
1630 }
1631
be0676b3 1632 /**
fcd39179 1633 * This method is the message listener registered on each worker.
be0676b3 1634 */
3a776b6d
JB
1635 protected readonly workerMessageListener = (
1636 message: MessageValue<Response>
1637 ): void => {
fcd39179 1638 this.checkMessageWorkerId(message)
b641345c
JB
1639 const { workerId, ready, taskId, taskFunctionNames } = message
1640 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1641 // Worker ready response received from worker
1642 this.handleWorkerReadyResponse(message)
b641345c 1643 } else if (taskId != null) {
fcd39179
JB
1644 // Task execution response received from worker
1645 this.handleTaskExecutionResponse(message)
b641345c 1646 } else if (taskFunctionNames != null) {
fcd39179
JB
1647 // Task function names message received from worker
1648 this.getWorkerInfo(
b641345c
JB
1649 this.getWorkerNodeKeyByWorkerId(workerId)
1650 ).taskFunctionNames = taskFunctionNames
6b272951
JB
1651 }
1652 }
1653
10e2aa7e 1654 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4
JB
1655 const { workerId, ready, taskFunctionNames } = message
1656 if (ready === false) {
1657 throw new Error(`Worker ${workerId as number} failed to initialize`)
f05ed93c 1658 }
a5d15204 1659 const workerInfo = this.getWorkerInfo(
463226a4 1660 this.getWorkerNodeKeyByWorkerId(workerId)
46b0bb09 1661 )
463226a4
JB
1662 workerInfo.ready = ready as boolean
1663 workerInfo.taskFunctionNames = taskFunctionNames
55082af9
JB
1664 if (!this.readyEventEmitted && this.ready) {
1665 this.readyEventEmitted = true
1666 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1667 }
6b272951
JB
1668 }
1669
1670 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1671 const { workerId, taskId, workerError, data } = message
5441aea6 1672 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1673 if (promiseResponse != null) {
f18fd12b 1674 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1675 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1676 if (workerError != null) {
1677 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1678 asyncResource != null
1679 ? asyncResource.runInAsyncScope(
1680 reject,
1681 this.emitter,
1682 workerError.message
1683 )
1684 : reject(workerError.message)
6b272951 1685 } else {
f18fd12b
JB
1686 asyncResource != null
1687 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1688 : resolve(data as Response)
6b272951 1689 }
f18fd12b 1690 asyncResource?.emitDestroy()
501aea93 1691 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1692 this.promiseResponseMap.delete(taskId as string)
3c8a79b4 1693 workerNode?.emit('taskFinished', taskId)
85b2561d 1694 if (this.opts.enableTasksQueue === true && !this.destroying) {
9b358e72 1695 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1696 if (
1697 this.tasksQueueSize(workerNodeKey) > 0 &&
1698 workerNodeTasksUsage.executing <
1699 (this.opts.tasksQueueOptions?.concurrency as number)
1700 ) {
1701 this.executeTask(
1702 workerNodeKey,
1703 this.dequeueTask(workerNodeKey) as Task<Data>
1704 )
1705 }
1706 if (
1707 workerNodeTasksUsage.executing === 0 &&
1708 this.tasksQueueSize(workerNodeKey) === 0 &&
1709 workerNodeTasksUsage.sequentiallyStolen === 0
1710 ) {
9b358e72 1711 workerNode.emit('idleWorkerNode', {
e1c2dba7
JB
1712 workerId: workerId as number,
1713 workerNodeKey
1714 })
463226a4 1715 }
be0676b3
APA
1716 }
1717 }
be0676b3 1718 }
7c0ba920 1719
a1763c54 1720 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1721 if (this.busy) {
1722 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1723 }
1724 }
1725
1726 private checkAndEmitTaskQueuingEvents (): void {
1727 if (this.hasBackPressure()) {
1728 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1729 }
1730 }
1731
33e6bb4c
JB
1732 private checkAndEmitDynamicWorkerCreationEvents (): void {
1733 if (this.type === PoolTypes.dynamic) {
1734 if (this.full) {
1735 this.emitter?.emit(PoolEvents.full, this.info)
1736 }
1737 }
1738 }
1739
8a1260a3 1740 /**
aa9eede8 1741 * Gets the worker information given its worker node key.
8a1260a3
JB
1742 *
1743 * @param workerNodeKey - The worker node key.
3f09ed9f 1744 * @returns The worker information.
8a1260a3 1745 */
46b0bb09 1746 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1747 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1748 }
1749
a05c10de 1750 /**
c3719753 1751 * Creates a worker node.
ea7a90d3 1752 *
c3719753 1753 * @returns The created worker node.
ea7a90d3 1754 */
c3719753 1755 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1756 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1757 this.worker,
1758 this.filePath,
1759 {
1760 env: this.opts.env,
1761 workerOptions: this.opts.workerOptions,
1762 tasksQueueBackPressureSize:
32b141fd 1763 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1764 getDefaultTasksQueueOptions(
1765 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1766 ).size
c3719753 1767 }
671d5154 1768 )
b97d82d8 1769 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1770 if (this.starting) {
1771 workerNode.info.ready = true
1772 }
c3719753
JB
1773 return workerNode
1774 }
1775
1776 /**
1777 * Adds the given worker node in the pool worker nodes.
1778 *
1779 * @param workerNode - The worker node.
1780 * @returns The added worker node key.
1781 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1782 */
1783 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1784 this.workerNodes.push(workerNode)
c3719753 1785 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1786 if (workerNodeKey === -1) {
86ed0598 1787 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1788 }
1789 return workerNodeKey
ea7a90d3 1790 }
c923ce56 1791
51fe3d3c 1792 /**
9974369e 1793 * Removes the worker node from the pool worker nodes.
51fe3d3c 1794 *
9974369e 1795 * @param workerNode - The worker node.
51fe3d3c 1796 */
9974369e
JB
1797 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1798 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1799 if (workerNodeKey !== -1) {
1800 this.workerNodes.splice(workerNodeKey, 1)
1801 this.workerChoiceStrategyContext.remove(workerNodeKey)
1802 }
51fe3d3c 1803 }
adc3c320 1804
ae3ab61d
JB
1805 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1806 this.getWorkerInfo(workerNodeKey).ready = false
1807 }
1808
e2b31e32
JB
1809 /** @inheritDoc */
1810 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1811 return (
e2b31e32
JB
1812 this.opts.enableTasksQueue === true &&
1813 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1814 )
1815 }
1816
1817 private hasBackPressure (): boolean {
1818 return (
1819 this.opts.enableTasksQueue === true &&
1820 this.workerNodes.findIndex(
041dc05b 1821 workerNode => !workerNode.hasBackPressure()
a1763c54 1822 ) === -1
9e844245 1823 )
e2b31e32
JB
1824 }
1825
b0a4db63 1826 /**
aa9eede8 1827 * Executes the given task on the worker given its worker node key.
b0a4db63 1828 *
aa9eede8 1829 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1830 * @param task - The task to execute.
1831 */
2e81254d 1832 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1833 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1834 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1835 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1836 }
1837
f9f00b5f 1838 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1839 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1840 this.checkAndEmitTaskQueuingEvents()
1841 return tasksQueueSize
adc3c320
JB
1842 }
1843
416fd65c 1844 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1845 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1846 }
1847
416fd65c 1848 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1849 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1850 }
1851
87347ea8
JB
1852 protected flushTasksQueue (workerNodeKey: number): number {
1853 let flushedTasks = 0
920278a2
JB
1854 while (this.tasksQueueSize(workerNodeKey) > 0) {
1855 this.executeTask(
1856 workerNodeKey,
1857 this.dequeueTask(workerNodeKey) as Task<Data>
1858 )
87347ea8 1859 ++flushedTasks
ff733df7 1860 }
4b628b48 1861 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 1862 return flushedTasks
ff733df7
JB
1863 }
1864
ef41a6e6
JB
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey] of this.workerNodes.entries()) {
1867 this.flushTasksQueue(workerNodeKey)
1868 }
1869 }
c97c7edb 1870}