fix: fix race condition between ready and task functions message at
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
04f45163 109 throw new Error(
8c6d4acf 110 'Cannot start a pool from a worker with the same type as the pool'
04f45163 111 )
c97c7edb 112 }
8d3782fa 113 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 114 this.checkFilePath(this.filePath)
7c0ba920 115 this.checkPoolOptions(this.opts)
1086026a 116
7254e419
JB
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 120 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 121 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 122
6bd72cd0 123 if (this.opts.enableEvents === true) {
7c0ba920
JB
124 this.emitter = new PoolEmitter()
125 }
d59df138
JB
126 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
127 Worker,
128 Data,
129 Response
da309861
JB
130 >(
131 this,
132 this.opts.workerChoiceStrategy,
133 this.opts.workerChoiceStrategyOptions
134 )
b6b32453
JB
135
136 this.setupHook()
137
075e51d1 138 this.starting = true
e761c033 139 this.startPool()
075e51d1 140 this.starting = false
afe0d5bf
JB
141
142 this.startTimestamp = performance.now()
c97c7edb
S
143 }
144
a35560ba 145 private checkFilePath (filePath: string): void {
ffcbbad8
JB
146 if (
147 filePath == null ||
3d6dd312 148 typeof filePath !== 'string' ||
ffcbbad8
JB
149 (typeof filePath === 'string' && filePath.trim().length === 0)
150 ) {
c510fea7
APA
151 throw new Error('Please specify a file with a worker implementation')
152 }
3d6dd312
JB
153 if (!existsSync(filePath)) {
154 throw new Error(`Cannot find the worker file '${filePath}'`)
155 }
c510fea7
APA
156 }
157
8d3782fa
JB
158 private checkNumberOfWorkers (numberOfWorkers: number): void {
159 if (numberOfWorkers == null) {
160 throw new Error(
161 'Cannot instantiate a pool without specifying the number of workers'
162 )
78cea37e 163 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 164 throw new TypeError(
0d80593b 165 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
166 )
167 } else if (numberOfWorkers < 0) {
473c717a 168 throw new RangeError(
8d3782fa
JB
169 'Cannot instantiate a pool with a negative number of workers'
170 )
6b27d407 171 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
172 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
173 }
174 }
175
176 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 177 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
178 if (max == null) {
179 throw new Error(
180 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
181 )
182 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
183 throw new TypeError(
184 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 )
186 } else if (min > max) {
079de991
JB
187 throw new RangeError(
188 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
189 )
b97d82d8 190 } else if (max === 0) {
079de991 191 throw new RangeError(
d640b48b 192 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
193 )
194 } else if (min === max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
197 )
198 }
8d3782fa
JB
199 }
200 }
201
7c0ba920 202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
203 if (isPlainObject(opts)) {
204 this.opts.workerChoiceStrategy =
205 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
206 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
207 this.opts.workerChoiceStrategyOptions =
208 opts.workerChoiceStrategyOptions ??
209 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
210 this.checkValidWorkerChoiceStrategyOptions(
211 this.opts.workerChoiceStrategyOptions
212 )
1f68cede 213 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
214 this.opts.enableEvents = opts.enableEvents ?? true
215 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
216 if (this.opts.enableTasksQueue) {
217 this.checkValidTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
221 opts.tasksQueueOptions as TasksQueueOptions
222 )
223 }
224 } else {
225 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 226 }
aee46736
JB
227 }
228
229 private checkValidWorkerChoiceStrategy (
230 workerChoiceStrategy: WorkerChoiceStrategy
231 ): void {
232 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 233 throw new Error(
aee46736 234 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
235 )
236 }
7c0ba920
JB
237 }
238
0d80593b
JB
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
241 ): void {
242 if (!isPlainObject(workerChoiceStrategyOptions)) {
243 throw new TypeError(
244 'Invalid worker choice strategy options: must be a plain object'
245 )
246 }
49be33fe
JB
247 if (
248 workerChoiceStrategyOptions.weights != null &&
6b27d407 249 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
250 ) {
251 throw new Error(
252 'Invalid worker choice strategy options: must have a weight for each worker node'
253 )
254 }
f0d7f803
JB
255 if (
256 workerChoiceStrategyOptions.measurement != null &&
257 !Object.values(Measurements).includes(
258 workerChoiceStrategyOptions.measurement
259 )
260 ) {
261 throw new Error(
262 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
263 )
264 }
0d80593b
JB
265 }
266
a20f0ba5
JB
267 private checkValidTasksQueueOptions (
268 tasksQueueOptions: TasksQueueOptions
269 ): void {
0d80593b
JB
270 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
271 throw new TypeError('Invalid tasks queue options: must be a plain object')
272 }
f0d7f803
JB
273 if (
274 tasksQueueOptions?.concurrency != null &&
275 !Number.isSafeInteger(tasksQueueOptions.concurrency)
276 ) {
277 throw new TypeError(
278 'Invalid worker tasks concurrency: must be an integer'
279 )
280 }
281 if (
282 tasksQueueOptions?.concurrency != null &&
283 tasksQueueOptions.concurrency <= 0
284 ) {
a20f0ba5 285 throw new Error(
f0d7f803 286 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
287 )
288 }
289 }
290
e761c033
JB
291 private startPool (): void {
292 while (
293 this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
296 0
297 ) < this.numberOfWorkers
298 ) {
aa9eede8 299 this.createAndSetupWorkerNode()
e761c033
JB
300 }
301 }
302
08f3f44c 303 /** @inheritDoc */
6b27d407
JB
304 public get info (): PoolInfo {
305 return {
23ccf9d7 306 version,
6b27d407 307 type: this.type,
184855e6 308 worker: this.worker,
2431bdb4
JB
309 ready: this.ready,
310 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
311 minSize: this.minSize,
312 maxSize: this.maxSize,
c05f0d50
JB
313 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .runTime.aggregate &&
1305e9a8
JB
315 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
316 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
317 workerNodes: this.workerNodes.length,
318 idleWorkerNodes: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
f59e1027 320 workerNode.usage.tasks.executing === 0
a4e07f72
JB
321 ? accumulator + 1
322 : accumulator,
6b27d407
JB
323 0
324 ),
325 busyWorkerNodes: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
328 0
329 ),
a4e07f72 330 executedTasks: this.workerNodes.reduce(
6b27d407 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
333 0
334 ),
335 executingTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
f59e1027 337 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
338 0
339 ),
daf86646
JB
340 ...(this.opts.enableTasksQueue === true && {
341 queuedTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.queued,
344 0
345 )
346 }),
347 ...(this.opts.enableTasksQueue === true && {
348 maxQueuedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
351 0
352 )
353 }),
a4e07f72
JB
354 failedTasks: this.workerNodes.reduce(
355 (accumulator, workerNode) =>
f59e1027 356 accumulator + workerNode.usage.tasks.failed,
a4e07f72 357 0
1dcf8b7b
JB
358 ),
359 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .runTime.aggregate && {
361 runTime: {
98e72cda
JB
362 minimum: round(
363 Math.min(
364 ...this.workerNodes.map(
8ebe6c30 365 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 366 )
1dcf8b7b
JB
367 )
368 ),
98e72cda
JB
369 maximum: round(
370 Math.max(
371 ...this.workerNodes.map(
8ebe6c30 372 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 373 )
1dcf8b7b 374 )
98e72cda
JB
375 ),
376 average: round(
377 this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
380 0
381 ) /
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.tasks?.executed ?? 0),
385 0
386 )
387 ),
388 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
389 .runTime.median && {
390 median: round(
391 median(
392 this.workerNodes.map(
8ebe6c30 393 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
394 )
395 )
396 )
397 })
1dcf8b7b
JB
398 }
399 }),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .waitTime.aggregate && {
402 waitTime: {
98e72cda
JB
403 minimum: round(
404 Math.min(
405 ...this.workerNodes.map(
8ebe6c30 406 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 407 )
1dcf8b7b
JB
408 )
409 ),
98e72cda
JB
410 maximum: round(
411 Math.max(
412 ...this.workerNodes.map(
8ebe6c30 413 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 414 )
1dcf8b7b 415 )
98e72cda
JB
416 ),
417 average: round(
418 this.workerNodes.reduce(
419 (accumulator, workerNode) =>
420 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
421 0
422 ) /
423 this.workerNodes.reduce(
424 (accumulator, workerNode) =>
425 accumulator + (workerNode.usage.tasks?.executed ?? 0),
426 0
427 )
428 ),
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .waitTime.median && {
431 median: round(
432 median(
433 this.workerNodes.map(
8ebe6c30 434 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
435 )
436 )
437 )
438 })
1dcf8b7b
JB
439 }
440 })
6b27d407
JB
441 }
442 }
08f3f44c 443
aa9eede8
JB
444 /**
445 * The pool readiness boolean status.
446 */
2431bdb4
JB
447 private get ready (): boolean {
448 return (
b97d82d8
JB
449 this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 !workerNode.info.dynamic && workerNode.info.ready
452 ? accumulator + 1
453 : accumulator,
454 0
455 ) >= this.minSize
2431bdb4
JB
456 )
457 }
458
afe0d5bf 459 /**
aa9eede8 460 * The approximate pool utilization.
afe0d5bf
JB
461 *
462 * @returns The pool utilization.
463 */
464 private get utilization (): number {
8e5ca040 465 const poolTimeCapacity =
fe7d90db 466 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
467 const totalTasksRunTime = this.workerNodes.reduce(
468 (accumulator, workerNode) =>
71514351 469 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
470 0
471 )
472 const totalTasksWaitTime = this.workerNodes.reduce(
473 (accumulator, workerNode) =>
71514351 474 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
475 0
476 )
8e5ca040 477 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
478 }
479
8881ae32 480 /**
aa9eede8 481 * The pool type.
8881ae32
JB
482 *
483 * If it is `'dynamic'`, it provides the `max` property.
484 */
485 protected abstract get type (): PoolType
486
184855e6 487 /**
aa9eede8 488 * The worker type.
184855e6
JB
489 */
490 protected abstract get worker (): WorkerType
491
c2ade475 492 /**
aa9eede8 493 * The pool minimum size.
c2ade475 494 */
6b27d407 495 protected abstract get minSize (): number
ff733df7
JB
496
497 /**
aa9eede8 498 * The pool maximum size.
ff733df7 499 */
6b27d407 500 protected abstract get maxSize (): number
a35560ba 501
6b813701
JB
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
21f710aa 508 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
511 } else if (
21f710aa 512 message.workerId != null &&
aad6fb64 513 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
514 ) {
515 throw new Error(
516 `Worker message received from unknown worker '${message.workerId}'`
517 )
518 }
519 }
520
ffcbbad8 521 /**
f06e48d8 522 * Gets the given worker its worker node key.
ffcbbad8
JB
523 *
524 * @param worker - The worker.
f59e1027 525 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 526 */
aad6fb64 527 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 528 return this.workerNodes.findIndex(
8ebe6c30 529 (workerNode) => workerNode.worker === worker
f06e48d8 530 )
bf9549ae
JB
531 }
532
aa9eede8
JB
533 /**
534 * Gets the worker node key given its worker id.
535 *
536 * @param workerId - The worker id.
aad6fb64 537 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 538 */
aad6fb64
JB
539 private getWorkerNodeKeyByWorkerId (workerId: number): number {
540 return this.workerNodes.findIndex(
8ebe6c30 541 (workerNode) => workerNode.info.id === workerId
aad6fb64 542 )
aa9eede8
JB
543 }
544
afc003b2 545 /** @inheritDoc */
a35560ba 546 public setWorkerChoiceStrategy (
59219cbb
JB
547 workerChoiceStrategy: WorkerChoiceStrategy,
548 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 549 ): void {
aee46736 550 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 551 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
552 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
553 this.opts.workerChoiceStrategy
554 )
555 if (workerChoiceStrategyOptions != null) {
556 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
557 }
aa9eede8 558 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 559 workerNode.resetUsage()
9edb9717 560 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 561 }
a20f0ba5
JB
562 }
563
564 /** @inheritDoc */
565 public setWorkerChoiceStrategyOptions (
566 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
567 ): void {
0d80593b 568 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
569 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
a35560ba
S
572 )
573 }
574
a20f0ba5 575 /** @inheritDoc */
8f52842f
JB
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 581 this.flushTasksQueues()
a20f0ba5
JB
582 }
583 this.opts.enableTasksQueue = enable
8f52842f 584 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
585 }
586
587 /** @inheritDoc */
8f52842f 588 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 589 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
590 this.checkValidTasksQueueOptions(tasksQueueOptions)
591 this.opts.tasksQueueOptions =
592 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 593 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
594 delete this.opts.tasksQueueOptions
595 }
596 }
597
598 private buildTasksQueueOptions (
599 tasksQueueOptions: TasksQueueOptions
600 ): TasksQueueOptions {
601 return {
602 concurrency: tasksQueueOptions?.concurrency ?? 1
603 }
604 }
605
c319c66b
JB
606 /**
607 * Whether the pool is full or not.
608 *
609 * The pool filling boolean status.
610 */
dea903a8
JB
611 protected get full (): boolean {
612 return this.workerNodes.length >= this.maxSize
613 }
c2ade475 614
c319c66b
JB
615 /**
616 * Whether the pool is busy or not.
617 *
618 * The pool busyness boolean status.
619 */
620 protected abstract get busy (): boolean
7c0ba920 621
6c6afb84 622 /**
3d76750a 623 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
624 *
625 * @returns Worker nodes busyness boolean status.
626 */
c2ade475 627 protected internalBusy (): boolean {
3d76750a
JB
628 if (this.opts.enableTasksQueue === true) {
629 return (
630 this.workerNodes.findIndex(
8ebe6c30 631 (workerNode) =>
3d76750a
JB
632 workerNode.info.ready &&
633 workerNode.usage.tasks.executing <
634 (this.opts.tasksQueueOptions?.concurrency as number)
635 ) === -1
636 )
637 } else {
638 return (
639 this.workerNodes.findIndex(
8ebe6c30 640 (workerNode) =>
3d76750a
JB
641 workerNode.info.ready && workerNode.usage.tasks.executing === 0
642 ) === -1
643 )
644 }
cb70b19d
JB
645 }
646
90d7d101
JB
647 /** @inheritDoc */
648 public listTaskFunctions (): string[] {
b558f6b5
JB
649 if (
650 Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
651 (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
652 ) {
653 return this.getWorkerInfo(0).taskFunctions as string[]
90d7d101
JB
654 } else {
655 return []
656 }
657 }
658
afc003b2 659 /** @inheritDoc */
7d91a8cd
JB
660 public async execute (
661 data?: Data,
662 name?: string,
663 transferList?: TransferListItem[]
664 ): Promise<Response> {
52b71763 665 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
666 if (name != null && typeof name !== 'string') {
667 reject(new TypeError('name argument must be a string'))
668 }
90d7d101
JB
669 if (
670 name != null &&
671 typeof name === 'string' &&
672 name.trim().length === 0
673 ) {
f58b60b9 674 reject(new TypeError('name argument must not be an empty string'))
90d7d101 675 }
b558f6b5
JB
676 if (transferList != null && !Array.isArray(transferList)) {
677 reject(new TypeError('transferList argument must be an array'))
678 }
679 const timestamp = performance.now()
680 const workerNodeKey = this.chooseWorkerNode()
a5d15204 681 const workerInfo = this.getWorkerInfo(workerNodeKey)
cea399c8
JB
682 if (
683 name != null &&
a5d15204
JB
684 Array.isArray(workerInfo.taskFunctions) &&
685 !workerInfo.taskFunctions.includes(name)
cea399c8 686 ) {
90d7d101
JB
687 reject(
688 new Error(`Task function '${name}' is not registered in the pool`)
689 )
690 }
501aea93 691 const task: Task<Data> = {
52b71763
JB
692 name: name ?? DEFAULT_TASK_NAME,
693 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
694 data: data ?? ({} as Data),
7d91a8cd 695 transferList,
52b71763 696 timestamp,
a5d15204 697 workerId: workerInfo.id as number,
7629bdf1 698 taskId: randomUUID()
52b71763 699 }
7629bdf1 700 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
701 resolve,
702 reject,
501aea93 703 workerNodeKey
2e81254d 704 })
52b71763 705 if (
4e377863
JB
706 this.opts.enableTasksQueue === false ||
707 (this.opts.enableTasksQueue === true &&
708 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 709 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 710 ) {
501aea93 711 this.executeTask(workerNodeKey, task)
4e377863
JB
712 } else {
713 this.enqueueTask(workerNodeKey, task)
52b71763
JB
714 }
715 this.checkAndEmitEvents()
2e81254d 716 })
280c2a77 717 }
c97c7edb 718
afc003b2 719 /** @inheritDoc */
c97c7edb 720 public async destroy (): Promise<void> {
1fbcaa7c 721 await Promise.all(
81c02522 722 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 723 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
724 })
725 )
ef3891a3 726 this.emitter?.emit(PoolEvents.destroy)
c97c7edb
S
727 }
728
1e3214b6
JB
729 protected async sendKillMessageToWorker (
730 workerNodeKey: number,
731 workerId: number
732 ): Promise<void> {
9edb9717 733 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
734 this.registerWorkerMessageListener(workerNodeKey, (message) => {
735 if (message.kill === 'success') {
736 resolve()
737 } else if (message.kill === 'failure') {
e1af34e6 738 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
739 }
740 })
9edb9717 741 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 742 })
1e3214b6
JB
743 }
744
4a6952ff 745 /**
aa9eede8 746 * Terminates the worker node given its worker node key.
4a6952ff 747 *
aa9eede8 748 * @param workerNodeKey - The worker node key.
4a6952ff 749 */
81c02522 750 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 751
729c563d 752 /**
6677a3d3
JB
753 * Setup hook to execute code before worker nodes are created in the abstract constructor.
754 * Can be overridden.
afc003b2
JB
755 *
756 * @virtual
729c563d 757 */
280c2a77 758 protected setupHook (): void {
d99ba5a8 759 // Intentionally empty
280c2a77 760 }
c97c7edb 761
729c563d 762 /**
280c2a77
S
763 * Should return whether the worker is the main worker or not.
764 */
765 protected abstract isMain (): boolean
766
767 /**
2e81254d 768 * Hook executed before the worker task execution.
bf9549ae 769 * Can be overridden.
729c563d 770 *
f06e48d8 771 * @param workerNodeKey - The worker node key.
1c6fe997 772 * @param task - The task to execute.
729c563d 773 */
1c6fe997
JB
774 protected beforeTaskExecutionHook (
775 workerNodeKey: number,
776 task: Task<Data>
777 ): void {
f59e1027 778 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
779 ++workerUsage.tasks.executing
780 this.updateWaitTimeWorkerUsage(workerUsage, task)
b558f6b5
JB
781 if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
782 const taskWorkerUsage = this.workerNodes[
783 workerNodeKey
784 ].getTaskWorkerUsage(task.name as string) as WorkerUsage
785 ++taskWorkerUsage.tasks.executing
786 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
787 }
c97c7edb
S
788 }
789
c01733f1 790 /**
2e81254d 791 * Hook executed after the worker task execution.
bf9549ae 792 * Can be overridden.
c01733f1 793 *
501aea93 794 * @param workerNodeKey - The worker node key.
38e795c1 795 * @param message - The received message.
c01733f1 796 */
2e81254d 797 protected afterTaskExecutionHook (
501aea93 798 workerNodeKey: number,
2740a743 799 message: MessageValue<Response>
bf9549ae 800 ): void {
ff128cc9 801 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
802 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
803 this.updateRunTimeWorkerUsage(workerUsage, message)
804 this.updateEluWorkerUsage(workerUsage, message)
b558f6b5
JB
805 if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
806 const taskWorkerUsage = this.workerNodes[
807 workerNodeKey
808 ].getTaskWorkerUsage(
809 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
810 ) as WorkerUsage
811 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
812 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
813 this.updateEluWorkerUsage(taskWorkerUsage, message)
814 }
815 }
816
817 private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
a5d15204 818 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 819 return (
a5d15204
JB
820 Array.isArray(workerInfo.taskFunctions) &&
821 workerInfo.taskFunctions.length > 1
b558f6b5 822 )
f1c06930
JB
823 }
824
825 private updateTaskStatisticsWorkerUsage (
826 workerUsage: WorkerUsage,
827 message: MessageValue<Response>
828 ): void {
a4e07f72
JB
829 const workerTaskStatistics = workerUsage.tasks
830 --workerTaskStatistics.executing
98e72cda
JB
831 if (message.taskError == null) {
832 ++workerTaskStatistics.executed
833 } else {
a4e07f72 834 ++workerTaskStatistics.failed
2740a743 835 }
f8eb0a2a
JB
836 }
837
a4e07f72
JB
838 private updateRunTimeWorkerUsage (
839 workerUsage: WorkerUsage,
f8eb0a2a
JB
840 message: MessageValue<Response>
841 ): void {
e4f20deb
JB
842 updateMeasurementStatistics(
843 workerUsage.runTime,
844 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
845 message.taskPerformance?.runTime ?? 0,
846 workerUsage.tasks.executed
847 )
f8eb0a2a
JB
848 }
849
a4e07f72
JB
850 private updateWaitTimeWorkerUsage (
851 workerUsage: WorkerUsage,
1c6fe997 852 task: Task<Data>
f8eb0a2a 853 ): void {
1c6fe997
JB
854 const timestamp = performance.now()
855 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
856 updateMeasurementStatistics(
857 workerUsage.waitTime,
858 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
859 taskWaitTime,
860 workerUsage.tasks.executed
861 )
c01733f1 862 }
863
a4e07f72 864 private updateEluWorkerUsage (
5df69fab 865 workerUsage: WorkerUsage,
62c15a68
JB
866 message: MessageValue<Response>
867 ): void {
008512c7
JB
868 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
869 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
870 updateMeasurementStatistics(
871 workerUsage.elu.active,
008512c7 872 eluTaskStatisticsRequirements,
e4f20deb
JB
873 message.taskPerformance?.elu?.active ?? 0,
874 workerUsage.tasks.executed
875 )
876 updateMeasurementStatistics(
877 workerUsage.elu.idle,
008512c7 878 eluTaskStatisticsRequirements,
e4f20deb
JB
879 message.taskPerformance?.elu?.idle ?? 0,
880 workerUsage.tasks.executed
881 )
008512c7 882 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 883 if (message.taskPerformance?.elu != null) {
f7510105
JB
884 if (workerUsage.elu.utilization != null) {
885 workerUsage.elu.utilization =
886 (workerUsage.elu.utilization +
887 message.taskPerformance.elu.utilization) /
888 2
889 } else {
890 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
891 }
62c15a68
JB
892 }
893 }
894 }
895
280c2a77 896 /**
f06e48d8 897 * Chooses a worker node for the next task.
280c2a77 898 *
6c6afb84 899 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 900 *
aa9eede8 901 * @returns The chosen worker node key
280c2a77 902 */
6c6afb84 903 private chooseWorkerNode (): number {
930dcf12 904 if (this.shallCreateDynamicWorker()) {
aa9eede8 905 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
906 if (
907 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
908 ) {
aa9eede8 909 return workerNodeKey
6c6afb84 910 }
17393ac8 911 }
930dcf12
JB
912 return this.workerChoiceStrategyContext.execute()
913 }
914
6c6afb84
JB
915 /**
916 * Conditions for dynamic worker creation.
917 *
918 * @returns Whether to create a dynamic worker or not.
919 */
920 private shallCreateDynamicWorker (): boolean {
930dcf12 921 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
922 }
923
280c2a77 924 /**
aa9eede8 925 * Sends a message to worker given its worker node key.
280c2a77 926 *
aa9eede8 927 * @param workerNodeKey - The worker node key.
38e795c1 928 * @param message - The message.
7d91a8cd 929 * @param transferList - The optional array of transferable objects.
280c2a77
S
930 */
931 protected abstract sendToWorker (
aa9eede8 932 workerNodeKey: number,
7d91a8cd
JB
933 message: MessageValue<Data>,
934 transferList?: TransferListItem[]
280c2a77
S
935 ): void
936
729c563d 937 /**
41344292 938 * Creates a new worker.
6c6afb84
JB
939 *
940 * @returns Newly created worker.
729c563d 941 */
280c2a77 942 protected abstract createWorker (): Worker
c97c7edb 943
4a6952ff 944 /**
aa9eede8 945 * Creates a new, completely set up worker node.
4a6952ff 946 *
aa9eede8 947 * @returns New, completely set up worker node key.
4a6952ff 948 */
aa9eede8 949 protected createAndSetupWorkerNode (): number {
bdacc2d2 950 const worker = this.createWorker()
280c2a77 951
fd04474e 952 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 953 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 954 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 955 worker.on('error', (error) => {
aad6fb64 956 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
957 const workerInfo = this.getWorkerInfo(workerNodeKey)
958 workerInfo.ready = false
0dc838e3 959 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 960 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 961 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 962 if (workerInfo.dynamic) {
aa9eede8 963 this.createAndSetupDynamicWorkerNode()
8a1260a3 964 } else {
aa9eede8 965 this.createAndSetupWorkerNode()
8a1260a3 966 }
5baee0d7 967 }
19dbc45b 968 if (this.opts.enableTasksQueue === true) {
9b106837 969 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 970 }
5baee0d7 971 })
a35560ba 972 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 973 worker.once('exit', () => {
f06e48d8 974 this.removeWorkerNode(worker)
a974afa6 975 })
280c2a77 976
aa9eede8 977 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 978
aa9eede8 979 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 980
aa9eede8 981 return workerNodeKey
c97c7edb 982 }
be0676b3 983
930dcf12 984 /**
aa9eede8 985 * Creates a new, completely set up dynamic worker node.
930dcf12 986 *
aa9eede8 987 * @returns New, completely set up dynamic worker node key.
930dcf12 988 */
aa9eede8
JB
989 protected createAndSetupDynamicWorkerNode (): number {
990 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 991 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
992 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
993 message.workerId
aad6fb64 994 )
aa9eede8 995 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 996 // Kill message received from worker
930dcf12
JB
997 if (
998 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 999 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1000 ((this.opts.enableTasksQueue === false &&
aa9eede8 1001 workerUsage.tasks.executing === 0) ||
7b56f532 1002 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1003 workerUsage.tasks.executing === 0 &&
1004 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1005 ) {
5270d253
JB
1006 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1007 this.emitter?.emit(PoolEvents.error, error)
1008 })
930dcf12
JB
1009 }
1010 })
aa9eede8 1011 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1012 this.sendToWorker(workerNodeKey, {
b0a4db63 1013 checkActive: true,
21f710aa
JB
1014 workerId: workerInfo.id as number
1015 })
b5e113f6
JB
1016 workerInfo.dynamic = true
1017 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1018 workerInfo.ready = true
1019 }
aa9eede8 1020 return workerNodeKey
930dcf12
JB
1021 }
1022
a2ed5053 1023 /**
aa9eede8 1024 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1025 *
aa9eede8 1026 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1027 * @param listener - The message listener callback.
1028 */
85aeb3f3
JB
1029 protected abstract registerWorkerMessageListener<
1030 Message extends Data | Response
aa9eede8
JB
1031 >(
1032 workerNodeKey: number,
1033 listener: (message: MessageValue<Message>) => void
1034 ): void
a2ed5053
JB
1035
1036 /**
aa9eede8 1037 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1038 * Can be overridden.
1039 *
aa9eede8 1040 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1041 */
aa9eede8 1042 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1043 // Listen to worker messages.
aa9eede8 1044 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1045 // Send the startup message to worker.
aa9eede8 1046 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1047 // Send the statistics message to worker.
1048 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1049 }
1050
85aeb3f3 1051 /**
aa9eede8
JB
1052 * Sends the startup message to worker given its worker node key.
1053 *
1054 * @param workerNodeKey - The worker node key.
1055 */
1056 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1057
1058 /**
9edb9717 1059 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1060 *
aa9eede8 1061 * @param workerNodeKey - The worker node key.
85aeb3f3 1062 */
9edb9717 1063 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1064 this.sendToWorker(workerNodeKey, {
1065 statistics: {
1066 runTime:
1067 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1068 .runTime.aggregate,
1069 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1070 .elu.aggregate
1071 },
1072 workerId: this.getWorkerInfo(workerNodeKey).id as number
1073 })
1074 }
a2ed5053
JB
1075
1076 private redistributeQueuedTasks (workerNodeKey: number): void {
1077 while (this.tasksQueueSize(workerNodeKey) > 0) {
1078 let targetWorkerNodeKey: number = workerNodeKey
1079 let minQueuedTasks = Infinity
10ecf8fd 1080 let executeTask = false
a2ed5053
JB
1081 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1082 const workerInfo = this.getWorkerInfo(workerNodeId)
1083 if (
1084 workerNodeId !== workerNodeKey &&
1085 workerInfo.ready &&
1086 workerNode.usage.tasks.queued === 0
1087 ) {
a5ed75b7
JB
1088 if (
1089 this.workerNodes[workerNodeId].usage.tasks.executing <
1090 (this.opts.tasksQueueOptions?.concurrency as number)
1091 ) {
10ecf8fd
JB
1092 executeTask = true
1093 }
a2ed5053
JB
1094 targetWorkerNodeKey = workerNodeId
1095 break
1096 }
1097 if (
1098 workerNodeId !== workerNodeKey &&
1099 workerInfo.ready &&
1100 workerNode.usage.tasks.queued < minQueuedTasks
1101 ) {
1102 minQueuedTasks = workerNode.usage.tasks.queued
1103 targetWorkerNodeKey = workerNodeId
1104 }
1105 }
10ecf8fd
JB
1106 if (executeTask) {
1107 this.executeTask(
1108 targetWorkerNodeKey,
1109 this.dequeueTask(workerNodeKey) as Task<Data>
1110 )
1111 } else {
1112 this.enqueueTask(
1113 targetWorkerNodeKey,
1114 this.dequeueTask(workerNodeKey) as Task<Data>
1115 )
1116 }
a2ed5053
JB
1117 }
1118 }
1119
be0676b3 1120 /**
aa9eede8 1121 * This method is the listener registered for each worker message.
be0676b3 1122 *
bdacc2d2 1123 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1124 */
1125 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1126 return (message) => {
21f710aa 1127 this.checkMessageWorkerId(message)
a5d15204 1128 if (message.ready != null && message.taskFunctions != null) {
81c02522 1129 // Worker ready response received from worker
10e2aa7e 1130 this.handleWorkerReadyResponse(message)
7629bdf1 1131 } else if (message.taskId != null) {
81c02522 1132 // Task execution response received from worker
6b272951 1133 this.handleTaskExecutionResponse(message)
90d7d101
JB
1134 } else if (message.taskFunctions != null) {
1135 // Task functions message received from worker
b558f6b5
JB
1136 this.getWorkerInfo(
1137 this.getWorkerNodeKeyByWorkerId(message.workerId)
1138 ).taskFunctions = message.taskFunctions
6b272951
JB
1139 }
1140 }
1141 }
1142
10e2aa7e 1143 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1144 if (message.ready === false) {
1145 throw new Error(`Worker ${message.workerId} failed to initialize`)
1146 }
a5d15204 1147 const workerInfo = this.getWorkerInfo(
aad6fb64 1148 this.getWorkerNodeKeyByWorkerId(message.workerId)
a5d15204
JB
1149 )
1150 workerInfo.ready = message.ready as boolean
1151 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1152 if (this.emitter != null && this.ready) {
1153 this.emitter.emit(PoolEvents.ready, this.info)
1154 }
6b272951
JB
1155 }
1156
1157 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1158 const { taskId, taskError, data } = message
1159 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1160 if (promiseResponse != null) {
5441aea6
JB
1161 if (taskError != null) {
1162 this.emitter?.emit(PoolEvents.taskError, taskError)
1163 promiseResponse.reject(taskError.message)
6b272951 1164 } else {
5441aea6 1165 promiseResponse.resolve(data as Response)
6b272951 1166 }
501aea93
JB
1167 const workerNodeKey = promiseResponse.workerNodeKey
1168 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1169 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1170 if (
1171 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1172 this.tasksQueueSize(workerNodeKey) > 0 &&
1173 this.workerNodes[workerNodeKey].usage.tasks.executing <
1174 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1175 ) {
1176 this.executeTask(
1177 workerNodeKey,
1178 this.dequeueTask(workerNodeKey) as Task<Data>
1179 )
be0676b3 1180 }
6b272951 1181 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1182 }
be0676b3 1183 }
7c0ba920 1184
ff733df7 1185 private checkAndEmitEvents (): void {
1f68cede 1186 if (this.emitter != null) {
ff733df7 1187 if (this.busy) {
2845f2a5 1188 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1189 }
6b27d407 1190 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1191 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1192 }
164d950a
JB
1193 }
1194 }
1195
8a1260a3 1196 /**
aa9eede8 1197 * Gets the worker information given its worker node key.
8a1260a3
JB
1198 *
1199 * @param workerNodeKey - The worker node key.
3f09ed9f 1200 * @returns The worker information.
8a1260a3 1201 */
aa9eede8 1202 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1203 return this.workerNodes[workerNodeKey].info
e221309a
JB
1204 }
1205
a05c10de 1206 /**
b0a4db63 1207 * Adds the given worker in the pool worker nodes.
ea7a90d3 1208 *
38e795c1 1209 * @param worker - The worker.
aa9eede8
JB
1210 * @returns The added worker node key.
1211 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1212 */
b0a4db63 1213 private addWorkerNode (worker: Worker): number {
cc3ab78b 1214 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1215 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1216 if (this.starting) {
1217 workerNode.info.ready = true
1218 }
aa9eede8 1219 this.workerNodes.push(workerNode)
aad6fb64 1220 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1221 if (workerNodeKey === -1) {
a5d15204 1222 throw new Error('Worker node added not found')
aa9eede8
JB
1223 }
1224 return workerNodeKey
ea7a90d3 1225 }
c923ce56 1226
51fe3d3c 1227 /**
f06e48d8 1228 * Removes the given worker from the pool worker nodes.
51fe3d3c 1229 *
f06e48d8 1230 * @param worker - The worker.
51fe3d3c 1231 */
416fd65c 1232 private removeWorkerNode (worker: Worker): void {
aad6fb64 1233 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1234 if (workerNodeKey !== -1) {
1235 this.workerNodes.splice(workerNodeKey, 1)
1236 this.workerChoiceStrategyContext.remove(workerNodeKey)
1237 }
51fe3d3c 1238 }
adc3c320 1239
b0a4db63 1240 /**
aa9eede8 1241 * Executes the given task on the worker given its worker node key.
b0a4db63 1242 *
aa9eede8 1243 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1244 * @param task - The task to execute.
1245 */
2e81254d 1246 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1247 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1248 this.sendToWorker(workerNodeKey, task, task.transferList)
2e81254d
JB
1249 }
1250
f9f00b5f 1251 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1252 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1253 }
1254
416fd65c 1255 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1256 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1257 }
1258
416fd65c 1259 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1260 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1261 }
1262
81c02522 1263 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1264 while (this.tasksQueueSize(workerNodeKey) > 0) {
1265 this.executeTask(
1266 workerNodeKey,
1267 this.dequeueTask(workerNodeKey) as Task<Data>
1268 )
ff733df7 1269 }
4b628b48 1270 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1271 }
1272
ef41a6e6
JB
1273 private flushTasksQueues (): void {
1274 for (const [workerNodeKey] of this.workerNodes.entries()) {
1275 this.flushTasksQueue(workerNodeKey)
1276 }
1277 }
c97c7edb 1278}