refactor: move message channel property from worker info to node
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
04f45163 109 throw new Error(
8c6d4acf 110 'Cannot start a pool from a worker with the same type as the pool'
04f45163 111 )
c97c7edb 112 }
8d3782fa 113 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 114 this.checkFilePath(this.filePath)
7c0ba920 115 this.checkPoolOptions(this.opts)
1086026a 116
7254e419
JB
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 120 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 121 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 122
6bd72cd0 123 if (this.opts.enableEvents === true) {
7c0ba920
JB
124 this.emitter = new PoolEmitter()
125 }
d59df138
JB
126 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
127 Worker,
128 Data,
129 Response
da309861
JB
130 >(
131 this,
132 this.opts.workerChoiceStrategy,
133 this.opts.workerChoiceStrategyOptions
134 )
b6b32453
JB
135
136 this.setupHook()
137
075e51d1 138 this.starting = true
e761c033 139 this.startPool()
075e51d1 140 this.starting = false
afe0d5bf
JB
141
142 this.startTimestamp = performance.now()
c97c7edb
S
143 }
144
a35560ba 145 private checkFilePath (filePath: string): void {
ffcbbad8
JB
146 if (
147 filePath == null ||
3d6dd312 148 typeof filePath !== 'string' ||
ffcbbad8
JB
149 (typeof filePath === 'string' && filePath.trim().length === 0)
150 ) {
c510fea7
APA
151 throw new Error('Please specify a file with a worker implementation')
152 }
3d6dd312
JB
153 if (!existsSync(filePath)) {
154 throw new Error(`Cannot find the worker file '${filePath}'`)
155 }
c510fea7
APA
156 }
157
8d3782fa
JB
158 private checkNumberOfWorkers (numberOfWorkers: number): void {
159 if (numberOfWorkers == null) {
160 throw new Error(
161 'Cannot instantiate a pool without specifying the number of workers'
162 )
78cea37e 163 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 164 throw new TypeError(
0d80593b 165 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
166 )
167 } else if (numberOfWorkers < 0) {
473c717a 168 throw new RangeError(
8d3782fa
JB
169 'Cannot instantiate a pool with a negative number of workers'
170 )
6b27d407 171 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
172 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
173 }
174 }
175
176 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 177 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
178 if (max == null) {
179 throw new Error(
180 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
181 )
182 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
183 throw new TypeError(
184 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 )
186 } else if (min > max) {
079de991
JB
187 throw new RangeError(
188 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
189 )
b97d82d8 190 } else if (max === 0) {
079de991 191 throw new RangeError(
d640b48b 192 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
193 )
194 } else if (min === max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
197 )
198 }
8d3782fa
JB
199 }
200 }
201
7c0ba920 202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
203 if (isPlainObject(opts)) {
204 this.opts.workerChoiceStrategy =
205 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
206 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
207 this.opts.workerChoiceStrategyOptions =
208 opts.workerChoiceStrategyOptions ??
209 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
210 this.checkValidWorkerChoiceStrategyOptions(
211 this.opts.workerChoiceStrategyOptions
212 )
1f68cede 213 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
214 this.opts.enableEvents = opts.enableEvents ?? true
215 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
216 if (this.opts.enableTasksQueue) {
217 this.checkValidTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
221 opts.tasksQueueOptions as TasksQueueOptions
222 )
223 }
224 } else {
225 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 226 }
aee46736
JB
227 }
228
229 private checkValidWorkerChoiceStrategy (
230 workerChoiceStrategy: WorkerChoiceStrategy
231 ): void {
232 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 233 throw new Error(
aee46736 234 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
235 )
236 }
7c0ba920
JB
237 }
238
0d80593b
JB
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
241 ): void {
242 if (!isPlainObject(workerChoiceStrategyOptions)) {
243 throw new TypeError(
244 'Invalid worker choice strategy options: must be a plain object'
245 )
246 }
49be33fe
JB
247 if (
248 workerChoiceStrategyOptions.weights != null &&
6b27d407 249 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
250 ) {
251 throw new Error(
252 'Invalid worker choice strategy options: must have a weight for each worker node'
253 )
254 }
f0d7f803
JB
255 if (
256 workerChoiceStrategyOptions.measurement != null &&
257 !Object.values(Measurements).includes(
258 workerChoiceStrategyOptions.measurement
259 )
260 ) {
261 throw new Error(
262 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
263 )
264 }
0d80593b
JB
265 }
266
a20f0ba5
JB
267 private checkValidTasksQueueOptions (
268 tasksQueueOptions: TasksQueueOptions
269 ): void {
0d80593b
JB
270 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
271 throw new TypeError('Invalid tasks queue options: must be a plain object')
272 }
f0d7f803
JB
273 if (
274 tasksQueueOptions?.concurrency != null &&
275 !Number.isSafeInteger(tasksQueueOptions.concurrency)
276 ) {
277 throw new TypeError(
278 'Invalid worker tasks concurrency: must be an integer'
279 )
280 }
281 if (
282 tasksQueueOptions?.concurrency != null &&
283 tasksQueueOptions.concurrency <= 0
284 ) {
a20f0ba5 285 throw new Error(
f0d7f803 286 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
287 )
288 }
289 }
290
e761c033
JB
291 private startPool (): void {
292 while (
293 this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
296 0
297 ) < this.numberOfWorkers
298 ) {
aa9eede8 299 this.createAndSetupWorkerNode()
e761c033
JB
300 }
301 }
302
08f3f44c 303 /** @inheritDoc */
6b27d407
JB
304 public get info (): PoolInfo {
305 return {
23ccf9d7 306 version,
6b27d407 307 type: this.type,
184855e6 308 worker: this.worker,
2431bdb4
JB
309 ready: this.ready,
310 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
311 minSize: this.minSize,
312 maxSize: this.maxSize,
c05f0d50
JB
313 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .runTime.aggregate &&
1305e9a8
JB
315 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
316 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
317 workerNodes: this.workerNodes.length,
318 idleWorkerNodes: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
f59e1027 320 workerNode.usage.tasks.executing === 0
a4e07f72
JB
321 ? accumulator + 1
322 : accumulator,
6b27d407
JB
323 0
324 ),
325 busyWorkerNodes: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
328 0
329 ),
a4e07f72 330 executedTasks: this.workerNodes.reduce(
6b27d407 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
333 0
334 ),
335 executingTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
f59e1027 337 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
338 0
339 ),
daf86646
JB
340 ...(this.opts.enableTasksQueue === true && {
341 queuedTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.queued,
344 0
345 )
346 }),
347 ...(this.opts.enableTasksQueue === true && {
348 maxQueuedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
351 0
352 )
353 }),
a4e07f72
JB
354 failedTasks: this.workerNodes.reduce(
355 (accumulator, workerNode) =>
f59e1027 356 accumulator + workerNode.usage.tasks.failed,
a4e07f72 357 0
1dcf8b7b
JB
358 ),
359 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .runTime.aggregate && {
361 runTime: {
98e72cda
JB
362 minimum: round(
363 Math.min(
364 ...this.workerNodes.map(
8ebe6c30 365 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 366 )
1dcf8b7b
JB
367 )
368 ),
98e72cda
JB
369 maximum: round(
370 Math.max(
371 ...this.workerNodes.map(
8ebe6c30 372 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 373 )
1dcf8b7b 374 )
98e72cda
JB
375 ),
376 average: round(
377 this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
380 0
381 ) /
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.tasks?.executed ?? 0),
385 0
386 )
387 ),
388 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
389 .runTime.median && {
390 median: round(
391 median(
392 this.workerNodes.map(
8ebe6c30 393 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
394 )
395 )
396 )
397 })
1dcf8b7b
JB
398 }
399 }),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .waitTime.aggregate && {
402 waitTime: {
98e72cda
JB
403 minimum: round(
404 Math.min(
405 ...this.workerNodes.map(
8ebe6c30 406 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 407 )
1dcf8b7b
JB
408 )
409 ),
98e72cda
JB
410 maximum: round(
411 Math.max(
412 ...this.workerNodes.map(
8ebe6c30 413 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 414 )
1dcf8b7b 415 )
98e72cda
JB
416 ),
417 average: round(
418 this.workerNodes.reduce(
419 (accumulator, workerNode) =>
420 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
421 0
422 ) /
423 this.workerNodes.reduce(
424 (accumulator, workerNode) =>
425 accumulator + (workerNode.usage.tasks?.executed ?? 0),
426 0
427 )
428 ),
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .waitTime.median && {
431 median: round(
432 median(
433 this.workerNodes.map(
8ebe6c30 434 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
435 )
436 )
437 )
438 })
1dcf8b7b
JB
439 }
440 })
6b27d407
JB
441 }
442 }
08f3f44c 443
aa9eede8
JB
444 /**
445 * The pool readiness boolean status.
446 */
2431bdb4
JB
447 private get ready (): boolean {
448 return (
b97d82d8
JB
449 this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 !workerNode.info.dynamic && workerNode.info.ready
452 ? accumulator + 1
453 : accumulator,
454 0
455 ) >= this.minSize
2431bdb4
JB
456 )
457 }
458
afe0d5bf 459 /**
aa9eede8 460 * The approximate pool utilization.
afe0d5bf
JB
461 *
462 * @returns The pool utilization.
463 */
464 private get utilization (): number {
8e5ca040 465 const poolTimeCapacity =
fe7d90db 466 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
467 const totalTasksRunTime = this.workerNodes.reduce(
468 (accumulator, workerNode) =>
71514351 469 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
470 0
471 )
472 const totalTasksWaitTime = this.workerNodes.reduce(
473 (accumulator, workerNode) =>
71514351 474 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
475 0
476 )
8e5ca040 477 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
478 }
479
8881ae32 480 /**
aa9eede8 481 * The pool type.
8881ae32
JB
482 *
483 * If it is `'dynamic'`, it provides the `max` property.
484 */
485 protected abstract get type (): PoolType
486
184855e6 487 /**
aa9eede8 488 * The worker type.
184855e6
JB
489 */
490 protected abstract get worker (): WorkerType
491
c2ade475 492 /**
aa9eede8 493 * The pool minimum size.
c2ade475 494 */
6b27d407 495 protected abstract get minSize (): number
ff733df7
JB
496
497 /**
aa9eede8 498 * The pool maximum size.
ff733df7 499 */
6b27d407 500 protected abstract get maxSize (): number
a35560ba 501
6b813701
JB
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
21f710aa 508 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (
21f710aa 512 message.workerId != null &&
aad6fb64 513 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
514 ) {
515 throw new Error(
516 `Worker message received from unknown worker '${message.workerId}'`
517 )
518 }
519 }
520
ffcbbad8 521 /**
f06e48d8 522 * Gets the given worker its worker node key.
ffcbbad8
JB
523 *
524 * @param worker - The worker.
f59e1027 525 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 526 */
aad6fb64 527 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 528 return this.workerNodes.findIndex(
8ebe6c30 529 (workerNode) => workerNode.worker === worker
f06e48d8 530 )
bf9549ae
JB
531 }
532
aa9eede8
JB
533 /**
534 * Gets the worker node key given its worker id.
535 *
536 * @param workerId - The worker id.
aad6fb64 537 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 538 */
aad6fb64
JB
539 private getWorkerNodeKeyByWorkerId (workerId: number): number {
540 return this.workerNodes.findIndex(
8ebe6c30 541 (workerNode) => workerNode.info.id === workerId
aad6fb64 542 )
aa9eede8
JB
543 }
544
afc003b2 545 /** @inheritDoc */
a35560ba 546 public setWorkerChoiceStrategy (
59219cbb
JB
547 workerChoiceStrategy: WorkerChoiceStrategy,
548 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 549 ): void {
aee46736 550 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 551 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
552 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
553 this.opts.workerChoiceStrategy
554 )
555 if (workerChoiceStrategyOptions != null) {
556 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
557 }
aa9eede8 558 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 559 workerNode.resetUsage()
9edb9717 560 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 561 }
a20f0ba5
JB
562 }
563
564 /** @inheritDoc */
565 public setWorkerChoiceStrategyOptions (
566 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
567 ): void {
0d80593b 568 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
569 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
a35560ba
S
572 )
573 }
574
a20f0ba5 575 /** @inheritDoc */
8f52842f
JB
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 581 this.flushTasksQueues()
a20f0ba5
JB
582 }
583 this.opts.enableTasksQueue = enable
8f52842f 584 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
585 }
586
587 /** @inheritDoc */
8f52842f 588 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 589 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
590 this.checkValidTasksQueueOptions(tasksQueueOptions)
591 this.opts.tasksQueueOptions =
592 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 593 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
594 delete this.opts.tasksQueueOptions
595 }
596 }
597
598 private buildTasksQueueOptions (
599 tasksQueueOptions: TasksQueueOptions
600 ): TasksQueueOptions {
601 return {
602 concurrency: tasksQueueOptions?.concurrency ?? 1
603 }
604 }
605
c319c66b
JB
606 /**
607 * Whether the pool is full or not.
608 *
609 * The pool filling boolean status.
610 */
dea903a8
JB
611 protected get full (): boolean {
612 return this.workerNodes.length >= this.maxSize
613 }
c2ade475 614
c319c66b
JB
615 /**
616 * Whether the pool is busy or not.
617 *
618 * The pool busyness boolean status.
619 */
620 protected abstract get busy (): boolean
7c0ba920 621
6c6afb84 622 /**
3d76750a 623 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
624 *
625 * @returns Worker nodes busyness boolean status.
626 */
c2ade475 627 protected internalBusy (): boolean {
3d76750a
JB
628 if (this.opts.enableTasksQueue === true) {
629 return (
630 this.workerNodes.findIndex(
8ebe6c30 631 (workerNode) =>
3d76750a
JB
632 workerNode.info.ready &&
633 workerNode.usage.tasks.executing <
634 (this.opts.tasksQueueOptions?.concurrency as number)
635 ) === -1
636 )
637 } else {
638 return (
639 this.workerNodes.findIndex(
8ebe6c30 640 (workerNode) =>
3d76750a
JB
641 workerNode.info.ready && workerNode.usage.tasks.executing === 0
642 ) === -1
643 )
644 }
cb70b19d
JB
645 }
646
90d7d101
JB
647 /** @inheritDoc */
648 public listTaskFunctions (): string[] {
f2dbbf95
JB
649 for (const workerNode of this.workerNodes) {
650 if (
651 Array.isArray(workerNode.info.taskFunctions) &&
652 workerNode.info.taskFunctions.length > 0
653 ) {
654 return workerNode.info.taskFunctions
655 }
90d7d101 656 }
f2dbbf95 657 return []
90d7d101
JB
658 }
659
afc003b2 660 /** @inheritDoc */
7d91a8cd
JB
661 public async execute (
662 data?: Data,
663 name?: string,
664 transferList?: TransferListItem[]
665 ): Promise<Response> {
52b71763 666 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
667 if (name != null && typeof name !== 'string') {
668 reject(new TypeError('name argument must be a string'))
669 }
90d7d101
JB
670 if (
671 name != null &&
672 typeof name === 'string' &&
673 name.trim().length === 0
674 ) {
f58b60b9 675 reject(new TypeError('name argument must not be an empty string'))
90d7d101 676 }
b558f6b5
JB
677 if (transferList != null && !Array.isArray(transferList)) {
678 reject(new TypeError('transferList argument must be an array'))
679 }
680 const timestamp = performance.now()
681 const workerNodeKey = this.chooseWorkerNode()
a5d15204 682 const workerInfo = this.getWorkerInfo(workerNodeKey)
cea399c8
JB
683 if (
684 name != null &&
a5d15204
JB
685 Array.isArray(workerInfo.taskFunctions) &&
686 !workerInfo.taskFunctions.includes(name)
cea399c8 687 ) {
90d7d101
JB
688 reject(
689 new Error(`Task function '${name}' is not registered in the pool`)
690 )
691 }
501aea93 692 const task: Task<Data> = {
52b71763
JB
693 name: name ?? DEFAULT_TASK_NAME,
694 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
695 data: data ?? ({} as Data),
7d91a8cd 696 transferList,
52b71763 697 timestamp,
a5d15204 698 workerId: workerInfo.id as number,
7629bdf1 699 taskId: randomUUID()
52b71763 700 }
7629bdf1 701 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
702 resolve,
703 reject,
501aea93 704 workerNodeKey
2e81254d 705 })
52b71763 706 if (
4e377863
JB
707 this.opts.enableTasksQueue === false ||
708 (this.opts.enableTasksQueue === true &&
709 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 710 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 711 ) {
501aea93 712 this.executeTask(workerNodeKey, task)
4e377863
JB
713 } else {
714 this.enqueueTask(workerNodeKey, task)
52b71763
JB
715 }
716 this.checkAndEmitEvents()
2e81254d 717 })
280c2a77 718 }
c97c7edb 719
afc003b2 720 /** @inheritDoc */
c97c7edb 721 public async destroy (): Promise<void> {
1fbcaa7c 722 await Promise.all(
81c02522 723 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 724 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
725 })
726 )
ef3891a3 727 this.emitter?.emit(PoolEvents.destroy)
c97c7edb
S
728 }
729
1e3214b6
JB
730 protected async sendKillMessageToWorker (
731 workerNodeKey: number,
732 workerId: number
733 ): Promise<void> {
9edb9717 734 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
735 this.registerWorkerMessageListener(workerNodeKey, (message) => {
736 if (message.kill === 'success') {
737 resolve()
738 } else if (message.kill === 'failure') {
e1af34e6 739 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
740 }
741 })
9edb9717 742 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 743 })
1e3214b6
JB
744 }
745
4a6952ff 746 /**
aa9eede8 747 * Terminates the worker node given its worker node key.
4a6952ff 748 *
aa9eede8 749 * @param workerNodeKey - The worker node key.
4a6952ff 750 */
81c02522 751 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 752
729c563d 753 /**
6677a3d3
JB
754 * Setup hook to execute code before worker nodes are created in the abstract constructor.
755 * Can be overridden.
afc003b2
JB
756 *
757 * @virtual
729c563d 758 */
280c2a77 759 protected setupHook (): void {
d99ba5a8 760 // Intentionally empty
280c2a77 761 }
c97c7edb 762
729c563d 763 /**
280c2a77
S
764 * Should return whether the worker is the main worker or not.
765 */
766 protected abstract isMain (): boolean
767
768 /**
2e81254d 769 * Hook executed before the worker task execution.
bf9549ae 770 * Can be overridden.
729c563d 771 *
f06e48d8 772 * @param workerNodeKey - The worker node key.
1c6fe997 773 * @param task - The task to execute.
729c563d 774 */
1c6fe997
JB
775 protected beforeTaskExecutionHook (
776 workerNodeKey: number,
777 task: Task<Data>
778 ): void {
f59e1027 779 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
780 ++workerUsage.tasks.executing
781 this.updateWaitTimeWorkerUsage(workerUsage, task)
b558f6b5
JB
782 if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
783 const taskWorkerUsage = this.workerNodes[
784 workerNodeKey
785 ].getTaskWorkerUsage(task.name as string) as WorkerUsage
786 ++taskWorkerUsage.tasks.executing
787 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
788 }
c97c7edb
S
789 }
790
c01733f1 791 /**
2e81254d 792 * Hook executed after the worker task execution.
bf9549ae 793 * Can be overridden.
c01733f1 794 *
501aea93 795 * @param workerNodeKey - The worker node key.
38e795c1 796 * @param message - The received message.
c01733f1 797 */
2e81254d 798 protected afterTaskExecutionHook (
501aea93 799 workerNodeKey: number,
2740a743 800 message: MessageValue<Response>
bf9549ae 801 ): void {
ff128cc9 802 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
803 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
804 this.updateRunTimeWorkerUsage(workerUsage, message)
805 this.updateEluWorkerUsage(workerUsage, message)
b558f6b5
JB
806 if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
807 const taskWorkerUsage = this.workerNodes[
808 workerNodeKey
809 ].getTaskWorkerUsage(
810 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
811 ) as WorkerUsage
812 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
813 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
814 this.updateEluWorkerUsage(taskWorkerUsage, message)
815 }
816 }
817
818 private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
a5d15204 819 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 820 return (
a5d15204
JB
821 Array.isArray(workerInfo.taskFunctions) &&
822 workerInfo.taskFunctions.length > 1
b558f6b5 823 )
f1c06930
JB
824 }
825
826 private updateTaskStatisticsWorkerUsage (
827 workerUsage: WorkerUsage,
828 message: MessageValue<Response>
829 ): void {
a4e07f72
JB
830 const workerTaskStatistics = workerUsage.tasks
831 --workerTaskStatistics.executing
98e72cda
JB
832 if (message.taskError == null) {
833 ++workerTaskStatistics.executed
834 } else {
a4e07f72 835 ++workerTaskStatistics.failed
2740a743 836 }
f8eb0a2a
JB
837 }
838
a4e07f72
JB
839 private updateRunTimeWorkerUsage (
840 workerUsage: WorkerUsage,
f8eb0a2a
JB
841 message: MessageValue<Response>
842 ): void {
e4f20deb
JB
843 updateMeasurementStatistics(
844 workerUsage.runTime,
845 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
846 message.taskPerformance?.runTime ?? 0,
847 workerUsage.tasks.executed
848 )
f8eb0a2a
JB
849 }
850
a4e07f72
JB
851 private updateWaitTimeWorkerUsage (
852 workerUsage: WorkerUsage,
1c6fe997 853 task: Task<Data>
f8eb0a2a 854 ): void {
1c6fe997
JB
855 const timestamp = performance.now()
856 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
857 updateMeasurementStatistics(
858 workerUsage.waitTime,
859 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
860 taskWaitTime,
861 workerUsage.tasks.executed
862 )
c01733f1 863 }
864
a4e07f72 865 private updateEluWorkerUsage (
5df69fab 866 workerUsage: WorkerUsage,
62c15a68
JB
867 message: MessageValue<Response>
868 ): void {
008512c7
JB
869 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
870 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
871 updateMeasurementStatistics(
872 workerUsage.elu.active,
008512c7 873 eluTaskStatisticsRequirements,
e4f20deb
JB
874 message.taskPerformance?.elu?.active ?? 0,
875 workerUsage.tasks.executed
876 )
877 updateMeasurementStatistics(
878 workerUsage.elu.idle,
008512c7 879 eluTaskStatisticsRequirements,
e4f20deb
JB
880 message.taskPerformance?.elu?.idle ?? 0,
881 workerUsage.tasks.executed
882 )
008512c7 883 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 884 if (message.taskPerformance?.elu != null) {
f7510105
JB
885 if (workerUsage.elu.utilization != null) {
886 workerUsage.elu.utilization =
887 (workerUsage.elu.utilization +
888 message.taskPerformance.elu.utilization) /
889 2
890 } else {
891 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
892 }
62c15a68
JB
893 }
894 }
895 }
896
280c2a77 897 /**
f06e48d8 898 * Chooses a worker node for the next task.
280c2a77 899 *
6c6afb84 900 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 901 *
aa9eede8 902 * @returns The chosen worker node key
280c2a77 903 */
6c6afb84 904 private chooseWorkerNode (): number {
930dcf12 905 if (this.shallCreateDynamicWorker()) {
aa9eede8 906 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
907 if (
908 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
909 ) {
aa9eede8 910 return workerNodeKey
6c6afb84 911 }
17393ac8 912 }
930dcf12
JB
913 return this.workerChoiceStrategyContext.execute()
914 }
915
6c6afb84
JB
916 /**
917 * Conditions for dynamic worker creation.
918 *
919 * @returns Whether to create a dynamic worker or not.
920 */
921 private shallCreateDynamicWorker (): boolean {
930dcf12 922 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
923 }
924
280c2a77 925 /**
aa9eede8 926 * Sends a message to worker given its worker node key.
280c2a77 927 *
aa9eede8 928 * @param workerNodeKey - The worker node key.
38e795c1 929 * @param message - The message.
7d91a8cd 930 * @param transferList - The optional array of transferable objects.
280c2a77
S
931 */
932 protected abstract sendToWorker (
aa9eede8 933 workerNodeKey: number,
7d91a8cd
JB
934 message: MessageValue<Data>,
935 transferList?: TransferListItem[]
280c2a77
S
936 ): void
937
729c563d 938 /**
41344292 939 * Creates a new worker.
6c6afb84
JB
940 *
941 * @returns Newly created worker.
729c563d 942 */
280c2a77 943 protected abstract createWorker (): Worker
c97c7edb 944
4a6952ff 945 /**
aa9eede8 946 * Creates a new, completely set up worker node.
4a6952ff 947 *
aa9eede8 948 * @returns New, completely set up worker node key.
4a6952ff 949 */
aa9eede8 950 protected createAndSetupWorkerNode (): number {
bdacc2d2 951 const worker = this.createWorker()
280c2a77 952
fd04474e 953 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 954 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 955 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 956 worker.on('error', (error) => {
aad6fb64 957 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
958 const workerInfo = this.getWorkerInfo(workerNodeKey)
959 workerInfo.ready = false
0dc838e3 960 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 961 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 962 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 963 if (workerInfo.dynamic) {
aa9eede8 964 this.createAndSetupDynamicWorkerNode()
8a1260a3 965 } else {
aa9eede8 966 this.createAndSetupWorkerNode()
8a1260a3 967 }
5baee0d7 968 }
19dbc45b 969 if (this.opts.enableTasksQueue === true) {
9b106837 970 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 971 }
5baee0d7 972 })
a35560ba 973 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 974 worker.once('exit', () => {
f06e48d8 975 this.removeWorkerNode(worker)
a974afa6 976 })
280c2a77 977
aa9eede8 978 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 979
aa9eede8 980 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 981
aa9eede8 982 return workerNodeKey
c97c7edb 983 }
be0676b3 984
930dcf12 985 /**
aa9eede8 986 * Creates a new, completely set up dynamic worker node.
930dcf12 987 *
aa9eede8 988 * @returns New, completely set up dynamic worker node key.
930dcf12 989 */
aa9eede8
JB
990 protected createAndSetupDynamicWorkerNode (): number {
991 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 992 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
993 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
994 message.workerId
aad6fb64 995 )
aa9eede8 996 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 997 // Kill message received from worker
930dcf12
JB
998 if (
999 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1000 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1001 ((this.opts.enableTasksQueue === false &&
aa9eede8 1002 workerUsage.tasks.executing === 0) ||
7b56f532 1003 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1004 workerUsage.tasks.executing === 0 &&
1005 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1006 ) {
5270d253
JB
1007 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1008 this.emitter?.emit(PoolEvents.error, error)
1009 })
930dcf12
JB
1010 }
1011 })
aa9eede8 1012 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1013 this.sendToWorker(workerNodeKey, {
b0a4db63 1014 checkActive: true,
21f710aa
JB
1015 workerId: workerInfo.id as number
1016 })
b5e113f6
JB
1017 workerInfo.dynamic = true
1018 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1019 workerInfo.ready = true
1020 }
aa9eede8 1021 return workerNodeKey
930dcf12
JB
1022 }
1023
a2ed5053 1024 /**
aa9eede8 1025 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1026 *
aa9eede8 1027 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1028 * @param listener - The message listener callback.
1029 */
85aeb3f3
JB
1030 protected abstract registerWorkerMessageListener<
1031 Message extends Data | Response
aa9eede8
JB
1032 >(
1033 workerNodeKey: number,
1034 listener: (message: MessageValue<Message>) => void
1035 ): void
a2ed5053
JB
1036
1037 /**
aa9eede8 1038 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1039 * Can be overridden.
1040 *
aa9eede8 1041 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1042 */
aa9eede8 1043 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1044 // Listen to worker messages.
aa9eede8 1045 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1046 // Send the startup message to worker.
aa9eede8 1047 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1048 // Send the statistics message to worker.
1049 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1050 }
1051
85aeb3f3 1052 /**
aa9eede8
JB
1053 * Sends the startup message to worker given its worker node key.
1054 *
1055 * @param workerNodeKey - The worker node key.
1056 */
1057 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1058
1059 /**
9edb9717 1060 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1061 *
aa9eede8 1062 * @param workerNodeKey - The worker node key.
85aeb3f3 1063 */
9edb9717 1064 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1065 this.sendToWorker(workerNodeKey, {
1066 statistics: {
1067 runTime:
1068 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1069 .runTime.aggregate,
1070 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1071 .elu.aggregate
1072 },
1073 workerId: this.getWorkerInfo(workerNodeKey).id as number
1074 })
1075 }
a2ed5053
JB
1076
1077 private redistributeQueuedTasks (workerNodeKey: number): void {
1078 while (this.tasksQueueSize(workerNodeKey) > 0) {
1079 let targetWorkerNodeKey: number = workerNodeKey
1080 let minQueuedTasks = Infinity
10ecf8fd 1081 let executeTask = false
a2ed5053
JB
1082 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1083 const workerInfo = this.getWorkerInfo(workerNodeId)
1084 if (
1085 workerNodeId !== workerNodeKey &&
1086 workerInfo.ready &&
1087 workerNode.usage.tasks.queued === 0
1088 ) {
a5ed75b7
JB
1089 if (
1090 this.workerNodes[workerNodeId].usage.tasks.executing <
1091 (this.opts.tasksQueueOptions?.concurrency as number)
1092 ) {
10ecf8fd
JB
1093 executeTask = true
1094 }
a2ed5053
JB
1095 targetWorkerNodeKey = workerNodeId
1096 break
1097 }
1098 if (
1099 workerNodeId !== workerNodeKey &&
1100 workerInfo.ready &&
1101 workerNode.usage.tasks.queued < minQueuedTasks
1102 ) {
1103 minQueuedTasks = workerNode.usage.tasks.queued
1104 targetWorkerNodeKey = workerNodeId
1105 }
1106 }
10ecf8fd
JB
1107 if (executeTask) {
1108 this.executeTask(
1109 targetWorkerNodeKey,
1110 this.dequeueTask(workerNodeKey) as Task<Data>
1111 )
1112 } else {
1113 this.enqueueTask(
1114 targetWorkerNodeKey,
1115 this.dequeueTask(workerNodeKey) as Task<Data>
1116 )
1117 }
a2ed5053
JB
1118 }
1119 }
1120
be0676b3 1121 /**
aa9eede8 1122 * This method is the listener registered for each worker message.
be0676b3 1123 *
bdacc2d2 1124 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1125 */
1126 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1127 return (message) => {
21f710aa 1128 this.checkMessageWorkerId(message)
a5d15204 1129 if (message.ready != null && message.taskFunctions != null) {
81c02522 1130 // Worker ready response received from worker
10e2aa7e 1131 this.handleWorkerReadyResponse(message)
7629bdf1 1132 } else if (message.taskId != null) {
81c02522 1133 // Task execution response received from worker
6b272951 1134 this.handleTaskExecutionResponse(message)
90d7d101
JB
1135 } else if (message.taskFunctions != null) {
1136 // Task functions message received from worker
b558f6b5
JB
1137 this.getWorkerInfo(
1138 this.getWorkerNodeKeyByWorkerId(message.workerId)
1139 ).taskFunctions = message.taskFunctions
6b272951
JB
1140 }
1141 }
1142 }
1143
10e2aa7e 1144 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1145 if (message.ready === false) {
1146 throw new Error(`Worker ${message.workerId} failed to initialize`)
1147 }
a5d15204 1148 const workerInfo = this.getWorkerInfo(
aad6fb64 1149 this.getWorkerNodeKeyByWorkerId(message.workerId)
a5d15204
JB
1150 )
1151 workerInfo.ready = message.ready as boolean
1152 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1153 if (this.emitter != null && this.ready) {
1154 this.emitter.emit(PoolEvents.ready, this.info)
1155 }
6b272951
JB
1156 }
1157
1158 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1159 const { taskId, taskError, data } = message
1160 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1161 if (promiseResponse != null) {
5441aea6
JB
1162 if (taskError != null) {
1163 this.emitter?.emit(PoolEvents.taskError, taskError)
1164 promiseResponse.reject(taskError.message)
6b272951 1165 } else {
5441aea6 1166 promiseResponse.resolve(data as Response)
6b272951 1167 }
501aea93
JB
1168 const workerNodeKey = promiseResponse.workerNodeKey
1169 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1170 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1171 if (
1172 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1173 this.tasksQueueSize(workerNodeKey) > 0 &&
1174 this.workerNodes[workerNodeKey].usage.tasks.executing <
1175 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1176 ) {
1177 this.executeTask(
1178 workerNodeKey,
1179 this.dequeueTask(workerNodeKey) as Task<Data>
1180 )
be0676b3 1181 }
6b272951 1182 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1183 }
be0676b3 1184 }
7c0ba920 1185
ff733df7 1186 private checkAndEmitEvents (): void {
1f68cede 1187 if (this.emitter != null) {
ff733df7 1188 if (this.busy) {
2845f2a5 1189 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1190 }
6b27d407 1191 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1192 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1193 }
164d950a
JB
1194 }
1195 }
1196
8a1260a3 1197 /**
aa9eede8 1198 * Gets the worker information given its worker node key.
8a1260a3
JB
1199 *
1200 * @param workerNodeKey - The worker node key.
3f09ed9f 1201 * @returns The worker information.
8a1260a3 1202 */
aa9eede8 1203 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1204 return this.workerNodes[workerNodeKey].info
e221309a
JB
1205 }
1206
a05c10de 1207 /**
b0a4db63 1208 * Adds the given worker in the pool worker nodes.
ea7a90d3 1209 *
38e795c1 1210 * @param worker - The worker.
aa9eede8
JB
1211 * @returns The added worker node key.
1212 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1213 */
b0a4db63 1214 private addWorkerNode (worker: Worker): number {
cc3ab78b 1215 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1216 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1217 if (this.starting) {
1218 workerNode.info.ready = true
1219 }
aa9eede8 1220 this.workerNodes.push(workerNode)
aad6fb64 1221 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1222 if (workerNodeKey === -1) {
a5d15204 1223 throw new Error('Worker node added not found')
aa9eede8
JB
1224 }
1225 return workerNodeKey
ea7a90d3 1226 }
c923ce56 1227
51fe3d3c 1228 /**
f06e48d8 1229 * Removes the given worker from the pool worker nodes.
51fe3d3c 1230 *
f06e48d8 1231 * @param worker - The worker.
51fe3d3c 1232 */
416fd65c 1233 private removeWorkerNode (worker: Worker): void {
aad6fb64 1234 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1235 if (workerNodeKey !== -1) {
1236 this.workerNodes.splice(workerNodeKey, 1)
1237 this.workerChoiceStrategyContext.remove(workerNodeKey)
1238 }
51fe3d3c 1239 }
adc3c320 1240
b0a4db63 1241 /**
aa9eede8 1242 * Executes the given task on the worker given its worker node key.
b0a4db63 1243 *
aa9eede8 1244 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1245 * @param task - The task to execute.
1246 */
2e81254d 1247 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1248 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1249 this.sendToWorker(workerNodeKey, task, task.transferList)
2e81254d
JB
1250 }
1251
f9f00b5f 1252 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1253 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1254 }
1255
416fd65c 1256 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1257 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1258 }
1259
416fd65c 1260 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1261 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1262 }
1263
81c02522 1264 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1265 while (this.tasksQueueSize(workerNodeKey) > 0) {
1266 this.executeTask(
1267 workerNodeKey,
1268 this.dequeueTask(workerNodeKey) as Task<Data>
1269 )
ff733df7 1270 }
4b628b48 1271 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1272 }
1273
ef41a6e6
JB
1274 private flushTasksQueues (): void {
1275 for (const [workerNodeKey] of this.workerNodes.entries()) {
1276 this.flushTasksQueue(workerNodeKey)
1277 }
1278 }
c97c7edb 1279}