Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
f18fd12b 5import { AsyncResource } from 'node:async_hooks'
5c4d16da
JB
6import type {
7 MessageValue,
8 PromiseResponseWrapper,
76d91ea0 9 Task
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16 13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
bbeadd16 23} from '../utils'
59317253 24import { KillBehaviors } from '../worker/worker-options'
6703b9f4 25import type { TaskFunction } from '../worker/task-functions'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
f1f77f45 38 TaskStatistics,
4d159167 39 WorkerInfo,
9f95d5eb 40 WorkerNodeEventDetail,
4d159167
JB
41 WorkerType,
42 WorkerUsage
e102732c 43} from './worker'
a35560ba 44import {
f0d7f803 45 Measurements,
a35560ba 46 WorkerChoiceStrategies,
a20f0ba5
JB
47 type WorkerChoiceStrategy,
48 type WorkerChoiceStrategyOptions
bdaf31cd
JB
49} from './selection-strategies/selection-strategies-types'
50import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 51import { version } from './version'
4b628b48 52import { WorkerNode } from './worker-node'
bde6b5d7
JB
53import {
54 checkFilePath,
55 checkValidTasksQueueOptions,
bfc75cca 56 checkValidWorkerChoiceStrategy,
32b141fd 57 getDefaultTasksQueueOptions,
c329fd41
JB
58 updateEluWorkerUsage,
59 updateRunTimeWorkerUsage,
60 updateTaskStatisticsWorkerUsage,
61 updateWaitTimeWorkerUsage,
87347ea8 62 waitWorkerNodeEvents
bde6b5d7 63} from './utils'
23ccf9d7 64
729c563d 65/**
ea7a90d3 66 * Base class that implements some shared logic for all poolifier pools.
729c563d 67 *
38e795c1 68 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 71 */
c97c7edb 72export abstract class AbstractPool<
f06e48d8 73 Worker extends IWorker,
d3c8a1a8
S
74 Data = unknown,
75 Response = unknown
c4855468 76> implements IPool<Worker, Data, Response> {
afc003b2 77 /** @inheritDoc */
4b628b48 78 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 79
afc003b2 80 /** @inheritDoc */
f80125ca 81 public emitter?: EventEmitterAsyncResource
7c0ba920 82
be0676b3 83 /**
419e8121 84 * The task execution response promise map:
2740a743 85 * - `key`: The message id of each submitted task.
a3445496 86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 87 *
a3445496 88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 89 */
501aea93
JB
90 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
91 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 92
a35560ba 93 /**
51fe3d3c 94 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
95 */
96 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
97 Worker,
98 Data,
99 Response
a35560ba
S
100 >
101
72ae84a2
JB
102 /**
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
106 */
107 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
108
15b176e0
JB
109 /**
110 * Whether the pool is started or not.
111 */
112 private started: boolean
47352846
JB
113 /**
114 * Whether the pool is starting or not.
115 */
116 private starting: boolean
711623b8
JB
117 /**
118 * Whether the pool is destroying or not.
119 */
120 private destroying: boolean
55082af9
JB
121 /**
122 * Whether the pool ready event has been emitted or not.
123 */
124 private readyEventEmitted: boolean
afe0d5bf
JB
125 /**
126 * The start timestamp of the pool.
127 */
128 private readonly startTimestamp
129
729c563d
S
130 /**
131 * Constructs a new poolifier pool.
132 *
00e1bdeb 133 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 134 * @param filePath - Path to the worker file.
38e795c1 135 * @param opts - Options for the pool.
00e1bdeb 136 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 137 */
c97c7edb 138 public constructor (
26ce26ca 139 protected readonly minimumNumberOfWorkers: number,
b4213b7f 140 protected readonly filePath: string,
26ce26ca
JB
141 protected readonly opts: PoolOptions<Worker>,
142 protected readonly maximumNumberOfWorkers?: number
c97c7edb 143 ) {
78cea37e 144 if (!this.isMain()) {
04f45163 145 throw new Error(
8c6d4acf 146 'Cannot start a pool from a worker with the same type as the pool'
04f45163 147 )
c97c7edb 148 }
26ce26ca 149 this.checkPoolType()
bde6b5d7 150 checkFilePath(this.filePath)
26ce26ca 151 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 152 this.checkPoolOptions(this.opts)
1086026a 153
7254e419
JB
154 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
155 this.executeTask = this.executeTask.bind(this)
156 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 157
6bd72cd0 158 if (this.opts.enableEvents === true) {
b5604034 159 this.initializeEventEmitter()
7c0ba920 160 }
d59df138
JB
161 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
162 Worker,
163 Data,
164 Response
da309861
JB
165 >(
166 this,
167 this.opts.workerChoiceStrategy,
168 this.opts.workerChoiceStrategyOptions
169 )
b6b32453
JB
170
171 this.setupHook()
172
72ae84a2
JB
173 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
174
47352846 175 this.started = false
075e51d1 176 this.starting = false
711623b8 177 this.destroying = false
55082af9 178 this.readyEventEmitted = false
47352846
JB
179 if (this.opts.startWorkers === true) {
180 this.start()
181 }
afe0d5bf
JB
182
183 this.startTimestamp = performance.now()
c97c7edb
S
184 }
185
26ce26ca
JB
186 private checkPoolType (): void {
187 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
188 throw new Error(
189 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
190 )
191 }
192 }
193
af7f2788
JB
194 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
195 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
af7f2788 199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 200 throw new TypeError(
0d80593b 201 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 202 )
af7f2788 203 } else if (minimumNumberOfWorkers < 0) {
473c717a 204 throw new RangeError(
8d3782fa
JB
205 'Cannot instantiate a pool with a negative number of workers'
206 )
af7f2788 207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 213 if (isPlainObject(opts)) {
47352846 214 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 215 checkValidWorkerChoiceStrategy(
2324f8c9
JB
216 opts.workerChoiceStrategy as WorkerChoiceStrategy
217 )
0d80593b
JB
218 this.opts.workerChoiceStrategy =
219 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
220 this.checkValidWorkerChoiceStrategyOptions(
221 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
222 )
26ce26ca
JB
223 if (opts.workerChoiceStrategyOptions != null) {
224 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 225 }
1f68cede 226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
bde6b5d7 230 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
0d80593b
JB
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
242 ): void {
2324f8c9
JB
243 if (
244 workerChoiceStrategyOptions != null &&
245 !isPlainObject(workerChoiceStrategyOptions)
246 ) {
0d80593b
JB
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
49be33fe 251 if (
2324f8c9 252 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
253 Object.keys(workerChoiceStrategyOptions.weights).length !==
254 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
255 ) {
256 throw new Error(
257 'Invalid worker choice strategy options: must have a weight for each worker node'
258 )
259 }
f0d7f803 260 if (
2324f8c9 261 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
262 !Object.values(Measurements).includes(
263 workerChoiceStrategyOptions.measurement
264 )
265 ) {
266 throw new Error(
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
268 )
269 }
0d80593b
JB
270 }
271
b5604034 272 private initializeEventEmitter (): void {
5b7d00b4 273 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
274 name: `poolifier:${this.type}-${this.worker}-pool`
275 })
276 }
277
08f3f44c 278 /** @inheritDoc */
6b27d407
JB
279 public get info (): PoolInfo {
280 return {
23ccf9d7 281 version,
6b27d407 282 type: this.type,
184855e6 283 worker: this.worker,
47352846 284 started: this.started,
2431bdb4
JB
285 ready: this.ready,
286 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
26ce26ca
JB
287 minSize: this.minimumNumberOfWorkers,
288 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
289 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
c05f0d50 290 .runTime.aggregate &&
26ce26ca 291 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1305e9a8 292 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
293 workerNodes: this.workerNodes.length,
294 idleWorkerNodes: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
f59e1027 296 workerNode.usage.tasks.executing === 0
a4e07f72
JB
297 ? accumulator + 1
298 : accumulator,
6b27d407
JB
299 0
300 ),
5eb72b9e
JB
301 ...(this.opts.enableTasksQueue === true && {
302 stealingWorkerNodes: this.workerNodes.reduce(
303 (accumulator, workerNode) =>
304 workerNode.info.stealing ? accumulator + 1 : accumulator,
305 0
306 )
307 }),
6b27d407 308 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
309 (accumulator, _workerNode, workerNodeKey) =>
310 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
311 0
312 ),
a4e07f72 313 executedTasks: this.workerNodes.reduce(
6b27d407 314 (accumulator, workerNode) =>
f59e1027 315 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
316 0
317 ),
318 executingTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
f59e1027 320 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
321 0
322 ),
daf86646
JB
323 ...(this.opts.enableTasksQueue === true && {
324 queuedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.tasks.queued,
327 0
328 )
329 }),
330 ...(this.opts.enableTasksQueue === true && {
331 maxQueuedTasks: this.workerNodes.reduce(
332 (accumulator, workerNode) =>
333 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
334 0
335 )
336 }),
a1763c54
JB
337 ...(this.opts.enableTasksQueue === true && {
338 backPressure: this.hasBackPressure()
339 }),
68cbdc84
JB
340 ...(this.opts.enableTasksQueue === true && {
341 stolenTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.stolen,
344 0
345 )
346 }),
a4e07f72
JB
347 failedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
f59e1027 349 accumulator + workerNode.usage.tasks.failed,
a4e07f72 350 0
1dcf8b7b 351 ),
26ce26ca 352 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1dcf8b7b
JB
353 .runTime.aggregate && {
354 runTime: {
98e72cda 355 minimum: round(
90d6701c 356 min(
98e72cda 357 ...this.workerNodes.map(
041dc05b 358 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 359 )
1dcf8b7b
JB
360 )
361 ),
98e72cda 362 maximum: round(
90d6701c 363 max(
98e72cda 364 ...this.workerNodes.map(
041dc05b 365 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 366 )
1dcf8b7b 367 )
98e72cda 368 ),
26ce26ca 369 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
3baa0837
JB
370 .runTime.average && {
371 average: round(
372 average(
373 this.workerNodes.reduce<number[]>(
374 (accumulator, workerNode) =>
375 accumulator.concat(workerNode.usage.runTime.history),
376 []
377 )
98e72cda 378 )
dc021bcc 379 )
3baa0837 380 }),
26ce26ca 381 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
98e72cda
JB
382 .runTime.median && {
383 median: round(
384 median(
3baa0837
JB
385 this.workerNodes.reduce<number[]>(
386 (accumulator, workerNode) =>
387 accumulator.concat(workerNode.usage.runTime.history),
388 []
98e72cda
JB
389 )
390 )
391 )
392 })
1dcf8b7b
JB
393 }
394 }),
26ce26ca 395 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1dcf8b7b
JB
396 .waitTime.aggregate && {
397 waitTime: {
98e72cda 398 minimum: round(
90d6701c 399 min(
98e72cda 400 ...this.workerNodes.map(
041dc05b 401 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 402 )
1dcf8b7b
JB
403 )
404 ),
98e72cda 405 maximum: round(
90d6701c 406 max(
98e72cda 407 ...this.workerNodes.map(
041dc05b 408 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 409 )
1dcf8b7b 410 )
98e72cda 411 ),
26ce26ca 412 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
3baa0837
JB
413 .waitTime.average && {
414 average: round(
415 average(
416 this.workerNodes.reduce<number[]>(
417 (accumulator, workerNode) =>
418 accumulator.concat(workerNode.usage.waitTime.history),
419 []
420 )
98e72cda 421 )
dc021bcc 422 )
3baa0837 423 }),
26ce26ca 424 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
98e72cda
JB
425 .waitTime.median && {
426 median: round(
427 median(
3baa0837
JB
428 this.workerNodes.reduce<number[]>(
429 (accumulator, workerNode) =>
430 accumulator.concat(workerNode.usage.waitTime.history),
431 []
98e72cda
JB
432 )
433 )
434 )
435 })
1dcf8b7b
JB
436 }
437 })
6b27d407
JB
438 }
439 }
08f3f44c 440
aa9eede8
JB
441 /**
442 * The pool readiness boolean status.
443 */
2431bdb4
JB
444 private get ready (): boolean {
445 return (
b97d82d8
JB
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 !workerNode.info.dynamic && workerNode.info.ready
449 ? accumulator + 1
450 : accumulator,
451 0
26ce26ca 452 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
453 )
454 }
455
afe0d5bf 456 /**
aa9eede8 457 * The approximate pool utilization.
afe0d5bf
JB
458 *
459 * @returns The pool utilization.
460 */
461 private get utilization (): number {
8e5ca040 462 const poolTimeCapacity =
26ce26ca
JB
463 (performance.now() - this.startTimestamp) *
464 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
465 const totalTasksRunTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
71514351 467 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
468 0
469 )
470 const totalTasksWaitTime = this.workerNodes.reduce(
471 (accumulator, workerNode) =>
71514351 472 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
473 0
474 )
8e5ca040 475 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
476 }
477
8881ae32 478 /**
aa9eede8 479 * The pool type.
8881ae32
JB
480 *
481 * If it is `'dynamic'`, it provides the `max` property.
482 */
483 protected abstract get type (): PoolType
484
184855e6 485 /**
aa9eede8 486 * The worker type.
184855e6
JB
487 */
488 protected abstract get worker (): WorkerType
489
6b813701
JB
490 /**
491 * Checks if the worker id sent in the received message from a worker is valid.
492 *
493 * @param message - The received message.
494 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
495 */
4d159167 496 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
497 if (message.workerId == null) {
498 throw new Error('Worker message received without worker id')
8e5a7cf9 499 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
500 throw new Error(
501 `Worker message received from unknown worker '${message.workerId}'`
502 )
503 }
504 }
505
aa9eede8
JB
506 /**
507 * Gets the worker node key given its worker id.
508 *
509 * @param workerId - The worker id.
aad6fb64 510 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 511 */
72ae84a2 512 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 513 return this.workerNodes.findIndex(
041dc05b 514 workerNode => workerNode.info.id === workerId
aad6fb64 515 )
aa9eede8
JB
516 }
517
afc003b2 518 /** @inheritDoc */
a35560ba 519 public setWorkerChoiceStrategy (
59219cbb
JB
520 workerChoiceStrategy: WorkerChoiceStrategy,
521 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 522 ): void {
bde6b5d7 523 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 524 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
525 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
526 this.opts.workerChoiceStrategy
527 )
528 if (workerChoiceStrategyOptions != null) {
529 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
530 }
aa9eede8 531 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 532 workerNode.resetUsage()
9edb9717 533 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 534 }
a20f0ba5
JB
535 }
536
537 /** @inheritDoc */
538 public setWorkerChoiceStrategyOptions (
539 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
540 ): void {
0d80593b 541 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
542 if (workerChoiceStrategyOptions != null) {
543 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
8990357d 544 }
a20f0ba5 545 this.workerChoiceStrategyContext.setOptions(
26ce26ca 546 this,
a20f0ba5 547 this.opts.workerChoiceStrategyOptions
a35560ba
S
548 )
549 }
550
a20f0ba5 551 /** @inheritDoc */
8f52842f
JB
552 public enableTasksQueue (
553 enable: boolean,
554 tasksQueueOptions?: TasksQueueOptions
555 ): void {
a20f0ba5 556 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
557 this.unsetTaskStealing()
558 this.unsetTasksStealingOnBackPressure()
ef41a6e6 559 this.flushTasksQueues()
a20f0ba5
JB
560 }
561 this.opts.enableTasksQueue = enable
8f52842f 562 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
563 }
564
565 /** @inheritDoc */
8f52842f 566 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 567 if (this.opts.enableTasksQueue === true) {
bde6b5d7 568 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
569 this.opts.tasksQueueOptions =
570 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 571 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416 572 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 573 this.unsetTaskStealing()
d6ca1416
JB
574 this.setTaskStealing()
575 } else {
576 this.unsetTaskStealing()
577 }
578 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 579 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
580 this.setTasksStealingOnBackPressure()
581 } else {
582 this.unsetTasksStealingOnBackPressure()
583 }
5baee0d7 584 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
585 delete this.opts.tasksQueueOptions
586 }
587 }
588
9b38ab2d
JB
589 private buildTasksQueueOptions (
590 tasksQueueOptions: TasksQueueOptions
591 ): TasksQueueOptions {
592 return {
26ce26ca
JB
593 ...getDefaultTasksQueueOptions(
594 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
595 ),
9b38ab2d
JB
596 ...tasksQueueOptions
597 }
598 }
599
5b49e864 600 private setTasksQueueSize (size: number): void {
20c6f652 601 for (const workerNode of this.workerNodes) {
ff3f866a 602 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
603 }
604 }
605
d6ca1416
JB
606 private setTaskStealing (): void {
607 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 608 this.workerNodes[workerNodeKey].on(
65542a35 609 'idleWorkerNode',
e1c2dba7 610 this.handleIdleWorkerNodeEvent
9f95d5eb 611 )
d6ca1416
JB
612 }
613 }
614
615 private unsetTaskStealing (): void {
616 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 617 this.workerNodes[workerNodeKey].off(
65542a35 618 'idleWorkerNode',
e1c2dba7 619 this.handleIdleWorkerNodeEvent
9f95d5eb 620 )
d6ca1416
JB
621 }
622 }
623
624 private setTasksStealingOnBackPressure (): void {
625 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 626 this.workerNodes[workerNodeKey].on(
b5e75be8 627 'backPressure',
e1c2dba7 628 this.handleBackPressureEvent
9f95d5eb 629 )
d6ca1416
JB
630 }
631 }
632
633 private unsetTasksStealingOnBackPressure (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 635 this.workerNodes[workerNodeKey].off(
b5e75be8 636 'backPressure',
e1c2dba7 637 this.handleBackPressureEvent
9f95d5eb 638 )
d6ca1416
JB
639 }
640 }
641
c319c66b
JB
642 /**
643 * Whether the pool is full or not.
644 *
645 * The pool filling boolean status.
646 */
dea903a8 647 protected get full (): boolean {
26ce26ca
JB
648 return (
649 this.workerNodes.length >=
650 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
651 )
dea903a8 652 }
c2ade475 653
c319c66b
JB
654 /**
655 * Whether the pool is busy or not.
656 *
657 * The pool busyness boolean status.
658 */
659 protected abstract get busy (): boolean
7c0ba920 660
6c6afb84 661 /**
3d76750a 662 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
663 *
664 * @returns Worker nodes busyness boolean status.
665 */
c2ade475 666 protected internalBusy (): boolean {
3d76750a
JB
667 if (this.opts.enableTasksQueue === true) {
668 return (
669 this.workerNodes.findIndex(
041dc05b 670 workerNode =>
3d76750a
JB
671 workerNode.info.ready &&
672 workerNode.usage.tasks.executing <
673 (this.opts.tasksQueueOptions?.concurrency as number)
674 ) === -1
675 )
3d76750a 676 }
419e8121
JB
677 return (
678 this.workerNodes.findIndex(
679 workerNode =>
680 workerNode.info.ready && workerNode.usage.tasks.executing === 0
681 ) === -1
682 )
cb70b19d
JB
683 }
684
42c677c1
JB
685 private isWorkerNodeBusy (workerNodeKey: number): boolean {
686 if (this.opts.enableTasksQueue === true) {
687 return (
688 this.workerNodes[workerNodeKey].usage.tasks.executing >=
689 (this.opts.tasksQueueOptions?.concurrency as number)
690 )
691 }
692 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
693 }
694
e81c38f2 695 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
696 workerNodeKey: number,
697 message: MessageValue<Data>
698 ): Promise<boolean> {
72ae84a2 699 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
700 const taskFunctionOperationListener = (
701 message: MessageValue<Response>
702 ): void => {
703 this.checkMessageWorkerId(message)
704 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 705 if (
ae036c3e
JB
706 message.taskFunctionOperationStatus != null &&
707 message.workerId === workerId
72ae84a2 708 ) {
ae036c3e
JB
709 if (message.taskFunctionOperationStatus) {
710 resolve(true)
711 } else if (!message.taskFunctionOperationStatus) {
712 reject(
713 new Error(
714 `Task function operation '${
715 message.taskFunctionOperation as string
716 }' failed on worker ${message.workerId} with error: '${
717 message.workerError?.message as string
718 }'`
719 )
72ae84a2 720 )
ae036c3e
JB
721 }
722 this.deregisterWorkerMessageListener(
723 this.getWorkerNodeKeyByWorkerId(message.workerId),
724 taskFunctionOperationListener
72ae84a2
JB
725 )
726 }
ae036c3e
JB
727 }
728 this.registerWorkerMessageListener(
729 workerNodeKey,
730 taskFunctionOperationListener
731 )
72ae84a2
JB
732 this.sendToWorker(workerNodeKey, message)
733 })
734 }
735
736 private async sendTaskFunctionOperationToWorkers (
adee6053 737 message: MessageValue<Data>
e81c38f2
JB
738 ): Promise<boolean> {
739 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
740 const responsesReceived = new Array<MessageValue<Response>>()
741 const taskFunctionOperationsListener = (
742 message: MessageValue<Response>
743 ): void => {
744 this.checkMessageWorkerId(message)
745 if (message.taskFunctionOperationStatus != null) {
746 responsesReceived.push(message)
747 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 748 if (
e81c38f2
JB
749 responsesReceived.every(
750 message => message.taskFunctionOperationStatus === true
751 )
752 ) {
753 resolve(true)
754 } else if (
e81c38f2
JB
755 responsesReceived.some(
756 message => message.taskFunctionOperationStatus === false
757 )
758 ) {
b0b55f57
JB
759 const errorResponse = responsesReceived.find(
760 response => response.taskFunctionOperationStatus === false
761 )
e81c38f2
JB
762 reject(
763 new Error(
b0b55f57 764 `Task function operation '${
e81c38f2 765 message.taskFunctionOperation as string
b0b55f57
JB
766 }' failed on worker ${
767 errorResponse?.workerId as number
768 } with error: '${
769 errorResponse?.workerError?.message as string
770 }'`
e81c38f2
JB
771 )
772 )
773 }
ae036c3e
JB
774 this.deregisterWorkerMessageListener(
775 this.getWorkerNodeKeyByWorkerId(message.workerId),
776 taskFunctionOperationsListener
777 )
e81c38f2 778 }
ae036c3e
JB
779 }
780 }
781 for (const [workerNodeKey] of this.workerNodes.entries()) {
782 this.registerWorkerMessageListener(
783 workerNodeKey,
784 taskFunctionOperationsListener
785 )
72ae84a2 786 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
787 }
788 })
6703b9f4
JB
789 }
790
791 /** @inheritDoc */
792 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
793 for (const workerNode of this.workerNodes) {
794 if (
795 Array.isArray(workerNode.info.taskFunctionNames) &&
796 workerNode.info.taskFunctionNames.includes(name)
797 ) {
798 return true
799 }
800 }
801 return false
6703b9f4
JB
802 }
803
804 /** @inheritDoc */
e81c38f2
JB
805 public async addTaskFunction (
806 name: string,
3feeab69 807 fn: TaskFunction<Data, Response>
e81c38f2 808 ): Promise<boolean> {
3feeab69
JB
809 if (typeof name !== 'string') {
810 throw new TypeError('name argument must be a string')
811 }
812 if (typeof name === 'string' && name.trim().length === 0) {
813 throw new TypeError('name argument must not be an empty string')
814 }
815 if (typeof fn !== 'function') {
816 throw new TypeError('fn argument must be a function')
817 }
adee6053 818 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
819 taskFunctionOperation: 'add',
820 taskFunctionName: name,
3feeab69 821 taskFunction: fn.toString()
6703b9f4 822 })
adee6053
JB
823 this.taskFunctions.set(name, fn)
824 return opResult
6703b9f4
JB
825 }
826
827 /** @inheritDoc */
e81c38f2 828 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
829 if (!this.taskFunctions.has(name)) {
830 throw new Error(
16248b23 831 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
832 )
833 }
adee6053 834 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
835 taskFunctionOperation: 'remove',
836 taskFunctionName: name
837 })
adee6053
JB
838 this.deleteTaskFunctionWorkerUsages(name)
839 this.taskFunctions.delete(name)
840 return opResult
6703b9f4
JB
841 }
842
90d7d101 843 /** @inheritDoc */
6703b9f4 844 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
845 for (const workerNode of this.workerNodes) {
846 if (
6703b9f4
JB
847 Array.isArray(workerNode.info.taskFunctionNames) &&
848 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 849 ) {
6703b9f4 850 return workerNode.info.taskFunctionNames
f2dbbf95 851 }
90d7d101 852 }
f2dbbf95 853 return []
90d7d101
JB
854 }
855
6703b9f4 856 /** @inheritDoc */
e81c38f2 857 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 858 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
859 taskFunctionOperation: 'default',
860 taskFunctionName: name
861 })
6703b9f4
JB
862 }
863
adee6053
JB
864 private deleteTaskFunctionWorkerUsages (name: string): void {
865 for (const workerNode of this.workerNodes) {
866 workerNode.deleteTaskFunctionWorkerUsage(name)
867 }
868 }
869
375f7504
JB
870 private shallExecuteTask (workerNodeKey: number): boolean {
871 return (
872 this.tasksQueueSize(workerNodeKey) === 0 &&
873 this.workerNodes[workerNodeKey].usage.tasks.executing <
874 (this.opts.tasksQueueOptions?.concurrency as number)
875 )
876 }
877
afc003b2 878 /** @inheritDoc */
7d91a8cd
JB
879 public async execute (
880 data?: Data,
881 name?: string,
882 transferList?: TransferListItem[]
883 ): Promise<Response> {
52b71763 884 return await new Promise<Response>((resolve, reject) => {
15b176e0 885 if (!this.started) {
47352846 886 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 887 return
15b176e0 888 }
711623b8
JB
889 if (this.destroying) {
890 reject(new Error('Cannot execute a task on destroying pool'))
891 return
892 }
7d91a8cd
JB
893 if (name != null && typeof name !== 'string') {
894 reject(new TypeError('name argument must be a string'))
9d2d0da1 895 return
7d91a8cd 896 }
90d7d101
JB
897 if (
898 name != null &&
899 typeof name === 'string' &&
900 name.trim().length === 0
901 ) {
f58b60b9 902 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 903 return
90d7d101 904 }
b558f6b5
JB
905 if (transferList != null && !Array.isArray(transferList)) {
906 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 907 return
b558f6b5
JB
908 }
909 const timestamp = performance.now()
910 const workerNodeKey = this.chooseWorkerNode()
501aea93 911 const task: Task<Data> = {
52b71763
JB
912 name: name ?? DEFAULT_TASK_NAME,
913 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
914 data: data ?? ({} as Data),
7d91a8cd 915 transferList,
52b71763 916 timestamp,
7629bdf1 917 taskId: randomUUID()
52b71763 918 }
7629bdf1 919 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
920 resolve,
921 reject,
f18fd12b
JB
922 workerNodeKey,
923 ...(this.emitter != null && {
924 asyncResource: new AsyncResource('poolifier:task', {
925 triggerAsyncId: this.emitter.asyncId,
926 requireManualDestroy: true
927 })
928 })
2e81254d 929 })
52b71763 930 if (
4e377863
JB
931 this.opts.enableTasksQueue === false ||
932 (this.opts.enableTasksQueue === true &&
375f7504 933 this.shallExecuteTask(workerNodeKey))
52b71763 934 ) {
501aea93 935 this.executeTask(workerNodeKey, task)
4e377863
JB
936 } else {
937 this.enqueueTask(workerNodeKey, task)
52b71763 938 }
2e81254d 939 })
280c2a77 940 }
c97c7edb 941
47352846
JB
942 /** @inheritdoc */
943 public start (): void {
711623b8
JB
944 if (this.started) {
945 throw new Error('Cannot start an already started pool')
946 }
947 if (this.starting) {
948 throw new Error('Cannot start an already starting pool')
949 }
950 if (this.destroying) {
951 throw new Error('Cannot start a destroying pool')
952 }
47352846
JB
953 this.starting = true
954 while (
955 this.workerNodes.reduce(
956 (accumulator, workerNode) =>
957 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
958 0
26ce26ca 959 ) < this.minimumNumberOfWorkers
47352846
JB
960 ) {
961 this.createAndSetupWorkerNode()
962 }
963 this.starting = false
964 this.started = true
965 }
966
afc003b2 967 /** @inheritDoc */
c97c7edb 968 public async destroy (): Promise<void> {
711623b8
JB
969 if (!this.started) {
970 throw new Error('Cannot destroy an already destroyed pool')
971 }
972 if (this.starting) {
973 throw new Error('Cannot destroy an starting pool')
974 }
975 if (this.destroying) {
976 throw new Error('Cannot destroy an already destroying pool')
977 }
978 this.destroying = true
1fbcaa7c 979 await Promise.all(
14c39df1 980 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 981 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
982 })
983 )
33e6bb4c 984 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 985 this.emitter?.emitDestroy()
78f60f82 986 this.emitter?.removeAllListeners()
55082af9 987 this.readyEventEmitted = false
711623b8 988 this.destroying = false
15b176e0 989 this.started = false
c97c7edb
S
990 }
991
26ce26ca 992 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 993 await new Promise<void>((resolve, reject) => {
49beedcb 994 if (this.workerNodes?.[workerNodeKey] == null) {
ad3836ed 995 resolve()
85b2561d
JB
996 return
997 }
ae036c3e
JB
998 const killMessageListener = (message: MessageValue<Response>): void => {
999 this.checkMessageWorkerId(message)
1e3214b6
JB
1000 if (message.kill === 'success') {
1001 resolve()
1002 } else if (message.kill === 'failure') {
72ae84a2
JB
1003 reject(
1004 new Error(
ae036c3e 1005 `Kill message handling failed on worker ${
72ae84a2 1006 message.workerId as number
ae036c3e 1007 }`
72ae84a2
JB
1008 )
1009 )
1e3214b6 1010 }
ae036c3e 1011 }
51d9cfbd 1012 // FIXME: should be registered only once
ae036c3e 1013 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1014 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1015 })
1e3214b6
JB
1016 }
1017
4a6952ff 1018 /**
aa9eede8 1019 * Terminates the worker node given its worker node key.
4a6952ff 1020 *
aa9eede8 1021 * @param workerNodeKey - The worker node key.
4a6952ff 1022 */
07e0c9e5
JB
1023 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1024 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1025 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1026 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1027 await waitWorkerNodeEvents(
1028 workerNode,
1029 'taskFinished',
1030 flushedTasks,
1031 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1032 getDefaultTasksQueueOptions(
1033 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1034 ).tasksFinishedTimeout
32b141fd 1035 )
07e0c9e5
JB
1036 await this.sendKillMessageToWorker(workerNodeKey)
1037 await workerNode.terminate()
1038 }
c97c7edb 1039
729c563d 1040 /**
6677a3d3
JB
1041 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1042 * Can be overridden.
afc003b2
JB
1043 *
1044 * @virtual
729c563d 1045 */
280c2a77 1046 protected setupHook (): void {
965df41c 1047 /* Intentionally empty */
280c2a77 1048 }
c97c7edb 1049
729c563d 1050 /**
280c2a77
S
1051 * Should return whether the worker is the main worker or not.
1052 */
1053 protected abstract isMain (): boolean
1054
1055 /**
2e81254d 1056 * Hook executed before the worker task execution.
bf9549ae 1057 * Can be overridden.
729c563d 1058 *
f06e48d8 1059 * @param workerNodeKey - The worker node key.
1c6fe997 1060 * @param task - The task to execute.
729c563d 1061 */
1c6fe997
JB
1062 protected beforeTaskExecutionHook (
1063 workerNodeKey: number,
1064 task: Task<Data>
1065 ): void {
94407def
JB
1066 if (this.workerNodes[workerNodeKey]?.usage != null) {
1067 const workerUsage = this.workerNodes[workerNodeKey].usage
1068 ++workerUsage.tasks.executing
c329fd41
JB
1069 updateWaitTimeWorkerUsage(
1070 this.workerChoiceStrategyContext,
1071 workerUsage,
1072 task
1073 )
94407def
JB
1074 }
1075 if (
1076 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1077 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1078 task.name as string
1079 ) != null
1080 ) {
db0e38ee 1081 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1082 workerNodeKey
db0e38ee 1083 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5 1084 ++taskFunctionWorkerUsage.tasks.executing
c329fd41
JB
1085 updateWaitTimeWorkerUsage(
1086 this.workerChoiceStrategyContext,
1087 taskFunctionWorkerUsage,
1088 task
1089 )
b558f6b5 1090 }
c97c7edb
S
1091 }
1092
c01733f1 1093 /**
2e81254d 1094 * Hook executed after the worker task execution.
bf9549ae 1095 * Can be overridden.
c01733f1 1096 *
501aea93 1097 * @param workerNodeKey - The worker node key.
38e795c1 1098 * @param message - The received message.
c01733f1 1099 */
2e81254d 1100 protected afterTaskExecutionHook (
501aea93 1101 workerNodeKey: number,
2740a743 1102 message: MessageValue<Response>
bf9549ae 1103 ): void {
c329fd41 1104 let needWorkerChoiceStrategyUpdate = false
94407def
JB
1105 if (this.workerNodes[workerNodeKey]?.usage != null) {
1106 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1107 updateTaskStatisticsWorkerUsage(workerUsage, message)
1108 updateRunTimeWorkerUsage(
1109 this.workerChoiceStrategyContext,
1110 workerUsage,
1111 message
1112 )
1113 updateEluWorkerUsage(
1114 this.workerChoiceStrategyContext,
1115 workerUsage,
1116 message
1117 )
1118 needWorkerChoiceStrategyUpdate = true
94407def
JB
1119 }
1120 if (
1121 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1122 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1123 message.taskPerformance?.name as string
94407def
JB
1124 ) != null
1125 ) {
db0e38ee 1126 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1127 workerNodeKey
db0e38ee 1128 ].getTaskFunctionWorkerUsage(
0628755c 1129 message.taskPerformance?.name as string
b558f6b5 1130 ) as WorkerUsage
c329fd41
JB
1131 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1132 updateRunTimeWorkerUsage(
1133 this.workerChoiceStrategyContext,
1134 taskFunctionWorkerUsage,
1135 message
1136 )
1137 updateEluWorkerUsage(
1138 this.workerChoiceStrategyContext,
1139 taskFunctionWorkerUsage,
1140 message
1141 )
1142 needWorkerChoiceStrategyUpdate = true
1143 }
1144 if (needWorkerChoiceStrategyUpdate) {
1145 this.workerChoiceStrategyContext.update(workerNodeKey)
b558f6b5
JB
1146 }
1147 }
1148
db0e38ee
JB
1149 /**
1150 * Whether the worker node shall update its task function worker usage or not.
1151 *
1152 * @param workerNodeKey - The worker node key.
1153 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1154 */
1155 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1156 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1157 return (
94407def 1158 workerInfo != null &&
6703b9f4
JB
1159 Array.isArray(workerInfo.taskFunctionNames) &&
1160 workerInfo.taskFunctionNames.length > 2
b558f6b5 1161 )
f1c06930
JB
1162 }
1163
280c2a77 1164 /**
f06e48d8 1165 * Chooses a worker node for the next task.
280c2a77 1166 *
6c6afb84 1167 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1168 *
aa9eede8 1169 * @returns The chosen worker node key
280c2a77 1170 */
6c6afb84 1171 private chooseWorkerNode (): number {
930dcf12 1172 if (this.shallCreateDynamicWorker()) {
aa9eede8 1173 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1174 if (
b1aae695 1175 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1176 ) {
aa9eede8 1177 return workerNodeKey
6c6afb84 1178 }
17393ac8 1179 }
930dcf12
JB
1180 return this.workerChoiceStrategyContext.execute()
1181 }
1182
6c6afb84
JB
1183 /**
1184 * Conditions for dynamic worker creation.
1185 *
1186 * @returns Whether to create a dynamic worker or not.
1187 */
9d9fb7b6 1188 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1189
280c2a77 1190 /**
aa9eede8 1191 * Sends a message to worker given its worker node key.
280c2a77 1192 *
aa9eede8 1193 * @param workerNodeKey - The worker node key.
38e795c1 1194 * @param message - The message.
7d91a8cd 1195 * @param transferList - The optional array of transferable objects.
280c2a77
S
1196 */
1197 protected abstract sendToWorker (
aa9eede8 1198 workerNodeKey: number,
7d91a8cd
JB
1199 message: MessageValue<Data>,
1200 transferList?: TransferListItem[]
280c2a77
S
1201 ): void
1202
4a6952ff 1203 /**
aa9eede8 1204 * Creates a new, completely set up worker node.
4a6952ff 1205 *
aa9eede8 1206 * @returns New, completely set up worker node key.
4a6952ff 1207 */
aa9eede8 1208 protected createAndSetupWorkerNode (): number {
c3719753
JB
1209 const workerNode = this.createWorkerNode()
1210 workerNode.registerWorkerEventHandler(
1211 'online',
1212 this.opts.onlineHandler ?? EMPTY_FUNCTION
1213 )
1214 workerNode.registerWorkerEventHandler(
1215 'message',
1216 this.opts.messageHandler ?? EMPTY_FUNCTION
1217 )
1218 workerNode.registerWorkerEventHandler(
1219 'error',
1220 this.opts.errorHandler ?? EMPTY_FUNCTION
1221 )
1222 workerNode.registerWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1223 workerNode.info.ready = false
2a69b8c5 1224 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1225 if (
b6bfca01 1226 this.started &&
711623b8 1227 !this.destroying &&
9b38ab2d 1228 this.opts.restartWorkerOnError === true
15b176e0 1229 ) {
07e0c9e5 1230 if (workerNode.info.dynamic) {
aa9eede8 1231 this.createAndSetupDynamicWorkerNode()
8a1260a3 1232 } else {
aa9eede8 1233 this.createAndSetupWorkerNode()
8a1260a3 1234 }
5baee0d7 1235 }
3c9123c7
JB
1236 if (
1237 this.started &&
1238 !this.destroying &&
1239 this.opts.enableTasksQueue === true
1240 ) {
9974369e 1241 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1242 }
32b141fd 1243 workerNode?.terminate().catch(error => {
07e0c9e5
JB
1244 this.emitter?.emit(PoolEvents.error, error)
1245 })
5baee0d7 1246 })
c3719753
JB
1247 workerNode.registerWorkerEventHandler(
1248 'exit',
1249 this.opts.exitHandler ?? EMPTY_FUNCTION
1250 )
1251 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1252 this.removeWorkerNode(workerNode)
a974afa6 1253 })
c3719753 1254 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1255 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1256 return workerNodeKey
c97c7edb 1257 }
be0676b3 1258
930dcf12 1259 /**
aa9eede8 1260 * Creates a new, completely set up dynamic worker node.
930dcf12 1261 *
aa9eede8 1262 * @returns New, completely set up dynamic worker node key.
930dcf12 1263 */
aa9eede8
JB
1264 protected createAndSetupDynamicWorkerNode (): number {
1265 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1266 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1267 this.checkMessageWorkerId(message)
aa9eede8
JB
1268 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1269 message.workerId
aad6fb64 1270 )
aa9eede8 1271 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1272 // Kill message received from worker
930dcf12
JB
1273 if (
1274 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1275 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1276 ((this.opts.enableTasksQueue === false &&
aa9eede8 1277 workerUsage.tasks.executing === 0) ||
7b56f532 1278 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1279 workerUsage.tasks.executing === 0 &&
1280 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1281 ) {
f3827d5d 1282 // Flag the worker node as not ready immediately
ae3ab61d 1283 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1284 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1285 this.emitter?.emit(PoolEvents.error, error)
1286 })
930dcf12
JB
1287 }
1288 })
46b0bb09 1289 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1290 this.sendToWorker(workerNodeKey, {
e9dd5b66 1291 checkActive: true
21f710aa 1292 })
72ae84a2
JB
1293 if (this.taskFunctions.size > 0) {
1294 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1295 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1296 taskFunctionOperation: 'add',
1297 taskFunctionName,
1298 taskFunction: taskFunction.toString()
1299 }).catch(error => {
1300 this.emitter?.emit(PoolEvents.error, error)
1301 })
1302 }
1303 }
b5e113f6 1304 workerInfo.dynamic = true
b1aae695
JB
1305 if (
1306 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1307 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1308 ) {
b5e113f6
JB
1309 workerInfo.ready = true
1310 }
33e6bb4c 1311 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1312 return workerNodeKey
930dcf12
JB
1313 }
1314
a2ed5053 1315 /**
aa9eede8 1316 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1317 *
aa9eede8 1318 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1319 * @param listener - The message listener callback.
1320 */
85aeb3f3
JB
1321 protected abstract registerWorkerMessageListener<
1322 Message extends Data | Response
aa9eede8
JB
1323 >(
1324 workerNodeKey: number,
1325 listener: (message: MessageValue<Message>) => void
1326 ): void
a2ed5053 1327
ae036c3e
JB
1328 /**
1329 * Registers once a listener callback on the worker given its worker node key.
1330 *
1331 * @param workerNodeKey - The worker node key.
1332 * @param listener - The message listener callback.
1333 */
1334 protected abstract registerOnceWorkerMessageListener<
1335 Message extends Data | Response
1336 >(
1337 workerNodeKey: number,
1338 listener: (message: MessageValue<Message>) => void
1339 ): void
1340
1341 /**
1342 * Deregisters a listener callback on the worker given its worker node key.
1343 *
1344 * @param workerNodeKey - The worker node key.
1345 * @param listener - The message listener callback.
1346 */
1347 protected abstract deregisterWorkerMessageListener<
1348 Message extends Data | Response
1349 >(
1350 workerNodeKey: number,
1351 listener: (message: MessageValue<Message>) => void
1352 ): void
1353
a2ed5053 1354 /**
aa9eede8 1355 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1356 * Can be overridden.
1357 *
aa9eede8 1358 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1359 */
aa9eede8 1360 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1361 // Listen to worker messages.
fcd39179
JB
1362 this.registerWorkerMessageListener(
1363 workerNodeKey,
3a776b6d 1364 this.workerMessageListener
fcd39179 1365 )
85aeb3f3 1366 // Send the startup message to worker.
aa9eede8 1367 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1368 // Send the statistics message to worker.
1369 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1370 if (this.opts.enableTasksQueue === true) {
dbd73092 1371 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1372 this.workerNodes[workerNodeKey].on(
65542a35 1373 'idleWorkerNode',
e1c2dba7 1374 this.handleIdleWorkerNodeEvent
9f95d5eb 1375 )
47352846
JB
1376 }
1377 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1378 this.workerNodes[workerNodeKey].on(
b5e75be8 1379 'backPressure',
e1c2dba7 1380 this.handleBackPressureEvent
9f95d5eb 1381 )
47352846 1382 }
72695f86 1383 }
d2c73f82
JB
1384 }
1385
85aeb3f3 1386 /**
aa9eede8
JB
1387 * Sends the startup message to worker given its worker node key.
1388 *
1389 * @param workerNodeKey - The worker node key.
1390 */
1391 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1392
1393 /**
9edb9717 1394 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1395 *
aa9eede8 1396 * @param workerNodeKey - The worker node key.
85aeb3f3 1397 */
9edb9717 1398 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1399 this.sendToWorker(workerNodeKey, {
1400 statistics: {
1401 runTime:
1402 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1403 .runTime.aggregate,
1404 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1405 .elu.aggregate
72ae84a2 1406 }
aa9eede8
JB
1407 })
1408 }
a2ed5053 1409
5eb72b9e
JB
1410 private cannotStealTask (): boolean {
1411 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1412 }
1413
42c677c1
JB
1414 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1415 if (this.shallExecuteTask(workerNodeKey)) {
1416 this.executeTask(workerNodeKey, task)
1417 } else {
1418 this.enqueueTask(workerNodeKey, task)
1419 }
1420 }
1421
a2ed5053 1422 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1423 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1424 return
1425 }
a2ed5053 1426 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1427 const destinationWorkerNodeKey = this.workerNodes.reduce(
1428 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1429 return workerNode.info.ready &&
1430 workerNode.usage.tasks.queued <
1431 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1432 ? workerNodeKey
1433 : minWorkerNodeKey
1434 },
1435 0
1436 )
42c677c1
JB
1437 this.handleTask(
1438 destinationWorkerNodeKey,
1439 this.dequeueTask(workerNodeKey) as Task<Data>
1440 )
dd951876
JB
1441 }
1442 }
1443
b1838604
JB
1444 private updateTaskStolenStatisticsWorkerUsage (
1445 workerNodeKey: number,
b1838604
JB
1446 taskName: string
1447 ): void {
1a880eca 1448 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1449 if (workerNode?.usage != null) {
1450 ++workerNode.usage.tasks.stolen
1451 }
1452 if (
1453 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1454 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1455 ) {
1456 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1457 taskName
1458 ) as WorkerUsage
1459 ++taskFunctionWorkerUsage.tasks.stolen
1460 }
1461 }
1462
463226a4 1463 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1464 workerNodeKey: number
463226a4
JB
1465 ): void {
1466 const workerNode = this.workerNodes[workerNodeKey]
1467 if (workerNode?.usage != null) {
1468 ++workerNode.usage.tasks.sequentiallyStolen
1469 }
f1f77f45
JB
1470 }
1471
1472 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1473 workerNodeKey: number,
1474 taskName: string
1475 ): void {
1476 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1477 if (
1478 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1479 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1480 ) {
1481 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1482 taskName
1483 ) as WorkerUsage
1484 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1485 }
1486 }
1487
1488 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1489 workerNodeKey: number
463226a4
JB
1490 ): void {
1491 const workerNode = this.workerNodes[workerNodeKey]
1492 if (workerNode?.usage != null) {
1493 workerNode.usage.tasks.sequentiallyStolen = 0
1494 }
f1f77f45
JB
1495 }
1496
1497 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1498 workerNodeKey: number,
1499 taskName: string
1500 ): void {
1501 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1502 if (
1503 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1504 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1505 ) {
1506 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1507 taskName
1508 ) as WorkerUsage
1509 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1510 }
1511 }
1512
65542a35 1513 private readonly handleIdleWorkerNodeEvent = (
e1c2dba7 1514 eventDetail: WorkerNodeEventDetail,
463226a4 1515 previousStolenTask?: Task<Data>
9f95d5eb 1516 ): void => {
e1c2dba7 1517 const { workerNodeKey } = eventDetail
463226a4
JB
1518 if (workerNodeKey == null) {
1519 throw new Error(
5eb72b9e 1520 'WorkerNode event detail workerNodeKey property must be defined'
463226a4
JB
1521 )
1522 }
5eb72b9e
JB
1523 if (
1524 this.cannotStealTask() ||
1525 (this.info.stealingWorkerNodes as number) >
1526 Math.floor(this.workerNodes.length / 2)
1527 ) {
1528 if (previousStolenTask != null) {
1529 this.getWorkerInfo(workerNodeKey).stealing = false
1530 }
1531 return
1532 }
463226a4
JB
1533 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1534 if (
1535 previousStolenTask != null &&
1536 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1537 (workerNodeTasksUsage.executing > 0 ||
1538 this.tasksQueueSize(workerNodeKey) > 0)
1539 ) {
5eb72b9e 1540 this.getWorkerInfo(workerNodeKey).stealing = false
f1f77f45
JB
1541 for (const taskName of this.workerNodes[workerNodeKey].info
1542 .taskFunctionNames as string[]) {
1543 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1544 workerNodeKey,
1545 taskName
1546 )
1547 }
1548 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1549 return
1550 }
5eb72b9e 1551 this.getWorkerInfo(workerNodeKey).stealing = true
463226a4 1552 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1553 if (
1554 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1555 stolenTask != null
1556 ) {
1557 const taskFunctionTasksWorkerUsage = this.workerNodes[
1558 workerNodeKey
1559 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1560 ?.tasks as TaskStatistics
1561 if (
1562 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1563 (previousStolenTask != null &&
1564 previousStolenTask.name === stolenTask.name &&
1565 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1566 ) {
1567 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1568 workerNodeKey,
1569 stolenTask.name as string
1570 )
1571 } else {
1572 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1573 workerNodeKey,
1574 stolenTask.name as string
1575 )
1576 }
1577 }
463226a4
JB
1578 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1579 .then(() => {
e1c2dba7 1580 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
463226a4
JB
1581 return undefined
1582 })
1583 .catch(EMPTY_FUNCTION)
1584 }
1585
1586 private readonly workerNodeStealTask = (
1587 workerNodeKey: number
1588 ): Task<Data> | undefined => {
dd951876 1589 const workerNodes = this.workerNodes
a6b3272b 1590 .slice()
dd951876
JB
1591 .sort(
1592 (workerNodeA, workerNodeB) =>
1593 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1594 )
f201a0cd 1595 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1596 (sourceWorkerNode, sourceWorkerNodeKey) =>
1597 sourceWorkerNode.info.ready &&
5eb72b9e 1598 !sourceWorkerNode.info.stealing &&
463226a4
JB
1599 sourceWorkerNodeKey !== workerNodeKey &&
1600 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1601 )
1602 if (sourceWorkerNode != null) {
72ae84a2 1603 const task = sourceWorkerNode.popTask() as Task<Data>
42c677c1 1604 this.handleTask(workerNodeKey, task)
f1f77f45 1605 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
f201a0cd 1606 this.updateTaskStolenStatisticsWorkerUsage(
463226a4 1607 workerNodeKey,
f201a0cd
JB
1608 task.name as string
1609 )
463226a4 1610 return task
72695f86
JB
1611 }
1612 }
1613
9f95d5eb 1614 private readonly handleBackPressureEvent = (
e1c2dba7 1615 eventDetail: WorkerNodeEventDetail
9f95d5eb 1616 ): void => {
5eb72b9e
JB
1617 if (
1618 this.cannotStealTask() ||
1619 (this.info.stealingWorkerNodes as number) >
1620 Math.floor(this.workerNodes.length / 2)
1621 ) {
cb71d660
JB
1622 return
1623 }
e1c2dba7 1624 const { workerId } = eventDetail
f778c355
JB
1625 const sizeOffset = 1
1626 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1627 return
1628 }
72695f86 1629 const sourceWorkerNode =
b5e75be8 1630 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1631 const workerNodes = this.workerNodes
a6b3272b 1632 .slice()
72695f86
JB
1633 .sort(
1634 (workerNodeA, workerNodeB) =>
1635 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1636 )
1637 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1638 if (
0bc68267 1639 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1640 workerNode.info.ready &&
5eb72b9e 1641 !workerNode.info.stealing &&
b5e75be8 1642 workerNode.info.id !== workerId &&
0bc68267 1643 workerNode.usage.tasks.queued <
f778c355 1644 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1645 ) {
5eb72b9e 1646 this.getWorkerInfo(workerNodeKey).stealing = true
72ae84a2 1647 const task = sourceWorkerNode.popTask() as Task<Data>
42c677c1 1648 this.handleTask(workerNodeKey, task)
b1838604
JB
1649 this.updateTaskStolenStatisticsWorkerUsage(
1650 workerNodeKey,
b1838604
JB
1651 task.name as string
1652 )
5eb72b9e 1653 this.getWorkerInfo(workerNodeKey).stealing = false
10ecf8fd 1654 }
a2ed5053
JB
1655 }
1656 }
1657
be0676b3 1658 /**
fcd39179 1659 * This method is the message listener registered on each worker.
be0676b3 1660 */
3a776b6d
JB
1661 protected readonly workerMessageListener = (
1662 message: MessageValue<Response>
1663 ): void => {
fcd39179 1664 this.checkMessageWorkerId(message)
b641345c
JB
1665 const { workerId, ready, taskId, taskFunctionNames } = message
1666 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1667 // Worker ready response received from worker
1668 this.handleWorkerReadyResponse(message)
b641345c 1669 } else if (taskId != null) {
fcd39179
JB
1670 // Task execution response received from worker
1671 this.handleTaskExecutionResponse(message)
b641345c 1672 } else if (taskFunctionNames != null) {
fcd39179
JB
1673 // Task function names message received from worker
1674 this.getWorkerInfo(
b641345c
JB
1675 this.getWorkerNodeKeyByWorkerId(workerId)
1676 ).taskFunctionNames = taskFunctionNames
6b272951
JB
1677 }
1678 }
1679
10e2aa7e 1680 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4
JB
1681 const { workerId, ready, taskFunctionNames } = message
1682 if (ready === false) {
1683 throw new Error(`Worker ${workerId as number} failed to initialize`)
f05ed93c 1684 }
a5d15204 1685 const workerInfo = this.getWorkerInfo(
463226a4 1686 this.getWorkerNodeKeyByWorkerId(workerId)
46b0bb09 1687 )
463226a4
JB
1688 workerInfo.ready = ready as boolean
1689 workerInfo.taskFunctionNames = taskFunctionNames
55082af9
JB
1690 if (!this.readyEventEmitted && this.ready) {
1691 this.readyEventEmitted = true
1692 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1693 }
6b272951
JB
1694 }
1695
1696 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1697 const { workerId, taskId, workerError, data } = message
5441aea6 1698 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1699 if (promiseResponse != null) {
f18fd12b 1700 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1701 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1702 if (workerError != null) {
1703 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1704 asyncResource != null
1705 ? asyncResource.runInAsyncScope(
1706 reject,
1707 this.emitter,
1708 workerError.message
1709 )
1710 : reject(workerError.message)
6b272951 1711 } else {
f18fd12b
JB
1712 asyncResource != null
1713 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1714 : resolve(data as Response)
6b272951 1715 }
f18fd12b 1716 asyncResource?.emitDestroy()
501aea93 1717 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1718 this.promiseResponseMap.delete(taskId as string)
3c8a79b4 1719 workerNode?.emit('taskFinished', taskId)
85b2561d 1720 if (this.opts.enableTasksQueue === true && !this.destroying) {
9b358e72 1721 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1722 if (
1723 this.tasksQueueSize(workerNodeKey) > 0 &&
1724 workerNodeTasksUsage.executing <
1725 (this.opts.tasksQueueOptions?.concurrency as number)
1726 ) {
1727 this.executeTask(
1728 workerNodeKey,
1729 this.dequeueTask(workerNodeKey) as Task<Data>
1730 )
1731 }
1732 if (
1733 workerNodeTasksUsage.executing === 0 &&
1734 this.tasksQueueSize(workerNodeKey) === 0 &&
1735 workerNodeTasksUsage.sequentiallyStolen === 0
1736 ) {
9b358e72 1737 workerNode.emit('idleWorkerNode', {
e1c2dba7
JB
1738 workerId: workerId as number,
1739 workerNodeKey
1740 })
463226a4 1741 }
be0676b3
APA
1742 }
1743 }
be0676b3 1744 }
7c0ba920 1745
a1763c54 1746 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1747 if (this.busy) {
1748 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1749 }
1750 }
1751
1752 private checkAndEmitTaskQueuingEvents (): void {
1753 if (this.hasBackPressure()) {
1754 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1755 }
1756 }
1757
d0878034
JB
1758 /**
1759 * Emits dynamic worker creation events.
1760 */
1761 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1762
8a1260a3 1763 /**
aa9eede8 1764 * Gets the worker information given its worker node key.
8a1260a3
JB
1765 *
1766 * @param workerNodeKey - The worker node key.
3f09ed9f 1767 * @returns The worker information.
8a1260a3 1768 */
46b0bb09 1769 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1770 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1771 }
1772
a05c10de 1773 /**
c3719753 1774 * Creates a worker node.
ea7a90d3 1775 *
c3719753 1776 * @returns The created worker node.
ea7a90d3 1777 */
c3719753 1778 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1779 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1780 this.worker,
1781 this.filePath,
1782 {
1783 env: this.opts.env,
1784 workerOptions: this.opts.workerOptions,
1785 tasksQueueBackPressureSize:
32b141fd 1786 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1787 getDefaultTasksQueueOptions(
1788 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1789 ).size
c3719753 1790 }
671d5154 1791 )
b97d82d8 1792 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1793 if (this.starting) {
1794 workerNode.info.ready = true
1795 }
c3719753
JB
1796 return workerNode
1797 }
1798
1799 /**
1800 * Adds the given worker node in the pool worker nodes.
1801 *
1802 * @param workerNode - The worker node.
1803 * @returns The added worker node key.
1804 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1805 */
1806 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1807 this.workerNodes.push(workerNode)
c3719753 1808 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1809 if (workerNodeKey === -1) {
86ed0598 1810 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1811 }
1812 return workerNodeKey
ea7a90d3 1813 }
c923ce56 1814
51fe3d3c 1815 /**
9974369e 1816 * Removes the worker node from the pool worker nodes.
51fe3d3c 1817 *
9974369e 1818 * @param workerNode - The worker node.
51fe3d3c 1819 */
9974369e
JB
1820 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1821 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1822 if (workerNodeKey !== -1) {
1823 this.workerNodes.splice(workerNodeKey, 1)
1824 this.workerChoiceStrategyContext.remove(workerNodeKey)
1825 }
51fe3d3c 1826 }
adc3c320 1827
ae3ab61d
JB
1828 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1829 this.getWorkerInfo(workerNodeKey).ready = false
1830 }
1831
e2b31e32
JB
1832 /** @inheritDoc */
1833 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1834 return (
e2b31e32
JB
1835 this.opts.enableTasksQueue === true &&
1836 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1837 )
1838 }
1839
1840 private hasBackPressure (): boolean {
1841 return (
1842 this.opts.enableTasksQueue === true &&
1843 this.workerNodes.findIndex(
041dc05b 1844 workerNode => !workerNode.hasBackPressure()
a1763c54 1845 ) === -1
9e844245 1846 )
e2b31e32
JB
1847 }
1848
b0a4db63 1849 /**
aa9eede8 1850 * Executes the given task on the worker given its worker node key.
b0a4db63 1851 *
aa9eede8 1852 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1853 * @param task - The task to execute.
1854 */
2e81254d 1855 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1856 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1857 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1858 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1859 }
1860
f9f00b5f 1861 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1862 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1863 this.checkAndEmitTaskQueuingEvents()
1864 return tasksQueueSize
adc3c320
JB
1865 }
1866
416fd65c 1867 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1868 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1869 }
1870
416fd65c 1871 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1872 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1873 }
1874
87347ea8
JB
1875 protected flushTasksQueue (workerNodeKey: number): number {
1876 let flushedTasks = 0
920278a2
JB
1877 while (this.tasksQueueSize(workerNodeKey) > 0) {
1878 this.executeTask(
1879 workerNodeKey,
1880 this.dequeueTask(workerNodeKey) as Task<Data>
1881 )
87347ea8 1882 ++flushedTasks
ff733df7 1883 }
4b628b48 1884 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 1885 return flushedTasks
ff733df7
JB
1886 }
1887
ef41a6e6
JB
1888 private flushTasksQueues (): void {
1889 for (const [workerNodeKey] of this.workerNodes.entries()) {
1890 this.flushTasksQueue(workerNodeKey)
1891 }
1892 }
c97c7edb 1893}