fix: ensure no tasks are queued when trying to soft kill a dynamic worker
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
8604aaab
JB
24import type {
25 IWorker,
26 Task,
27 TaskStatistics,
28 WorkerNode,
29 WorkerUsage
30} from './worker'
a35560ba
S
31import {
32 WorkerChoiceStrategies,
a20f0ba5
JB
33 type WorkerChoiceStrategy,
34 type WorkerChoiceStrategyOptions
bdaf31cd
JB
35} from './selection-strategies/selection-strategies-types'
36import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 37
729c563d 38/**
ea7a90d3 39 * Base class that implements some shared logic for all poolifier pools.
729c563d 40 *
38e795c1
JB
41 * @typeParam Worker - Type of worker which manages this pool.
42 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 43 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 44 */
c97c7edb 45export abstract class AbstractPool<
f06e48d8 46 Worker extends IWorker,
d3c8a1a8
S
47 Data = unknown,
48 Response = unknown
c4855468 49> implements IPool<Worker, Data, Response> {
afc003b2 50 /** @inheritDoc */
f06e48d8 51 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 52
afc003b2 53 /** @inheritDoc */
7c0ba920
JB
54 public readonly emitter?: PoolEmitter
55
be0676b3 56 /**
a3445496 57 * The execution response promise map.
be0676b3 58 *
2740a743 59 * - `key`: The message id of each submitted task.
a3445496 60 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 61 *
a3445496 62 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 63 */
c923ce56
JB
64 protected promiseResponseMap: Map<
65 string,
66 PromiseResponseWrapper<Worker, Response>
67 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 68
a35560ba 69 /**
51fe3d3c 70 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
71 */
72 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
73 Worker,
74 Data,
75 Response
a35560ba
S
76 >
77
729c563d
S
78 /**
79 * Constructs a new poolifier pool.
80 *
38e795c1 81 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 82 * @param filePath - Path to the worker file.
38e795c1 83 * @param opts - Options for the pool.
729c563d 84 */
c97c7edb 85 public constructor (
b4213b7f
JB
86 protected readonly numberOfWorkers: number,
87 protected readonly filePath: string,
88 protected readonly opts: PoolOptions<Worker>
c97c7edb 89 ) {
78cea37e 90 if (!this.isMain()) {
c97c7edb
S
91 throw new Error('Cannot start a pool from a worker!')
92 }
8d3782fa 93 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 94 this.checkFilePath(this.filePath)
7c0ba920 95 this.checkPoolOptions(this.opts)
1086026a 96
7254e419
JB
97 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
98 this.executeTask = this.executeTask.bind(this)
99 this.enqueueTask = this.enqueueTask.bind(this)
100 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 101
6bd72cd0 102 if (this.opts.enableEvents === true) {
7c0ba920
JB
103 this.emitter = new PoolEmitter()
104 }
d59df138
JB
105 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
106 Worker,
107 Data,
108 Response
da309861
JB
109 >(
110 this,
111 this.opts.workerChoiceStrategy,
112 this.opts.workerChoiceStrategyOptions
113 )
b6b32453
JB
114
115 this.setupHook()
116
117 for (let i = 1; i <= this.numberOfWorkers; i++) {
118 this.createAndSetupWorker()
119 }
c97c7edb
S
120 }
121
a35560ba 122 private checkFilePath (filePath: string): void {
ffcbbad8
JB
123 if (
124 filePath == null ||
125 (typeof filePath === 'string' && filePath.trim().length === 0)
126 ) {
c510fea7
APA
127 throw new Error('Please specify a file with a worker implementation')
128 }
129 }
130
8d3782fa
JB
131 private checkNumberOfWorkers (numberOfWorkers: number): void {
132 if (numberOfWorkers == null) {
133 throw new Error(
134 'Cannot instantiate a pool without specifying the number of workers'
135 )
78cea37e 136 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 137 throw new TypeError(
0d80593b 138 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
139 )
140 } else if (numberOfWorkers < 0) {
473c717a 141 throw new RangeError(
8d3782fa
JB
142 'Cannot instantiate a pool with a negative number of workers'
143 )
6b27d407 144 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
145 throw new Error('Cannot instantiate a fixed pool with no worker')
146 }
147 }
148
7c0ba920 149 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
150 if (isPlainObject(opts)) {
151 this.opts.workerChoiceStrategy =
152 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
153 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
154 this.opts.workerChoiceStrategyOptions =
155 opts.workerChoiceStrategyOptions ??
156 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
157 this.checkValidWorkerChoiceStrategyOptions(
158 this.opts.workerChoiceStrategyOptions
159 )
1f68cede 160 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
161 this.opts.enableEvents = opts.enableEvents ?? true
162 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
163 if (this.opts.enableTasksQueue) {
164 this.checkValidTasksQueueOptions(
165 opts.tasksQueueOptions as TasksQueueOptions
166 )
167 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
168 opts.tasksQueueOptions as TasksQueueOptions
169 )
170 }
171 } else {
172 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 173 }
aee46736
JB
174 }
175
176 private checkValidWorkerChoiceStrategy (
177 workerChoiceStrategy: WorkerChoiceStrategy
178 ): void {
179 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 180 throw new Error(
aee46736 181 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
182 )
183 }
7c0ba920
JB
184 }
185
0d80593b
JB
186 private checkValidWorkerChoiceStrategyOptions (
187 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
188 ): void {
189 if (!isPlainObject(workerChoiceStrategyOptions)) {
190 throw new TypeError(
191 'Invalid worker choice strategy options: must be a plain object'
192 )
193 }
49be33fe
JB
194 if (
195 workerChoiceStrategyOptions.weights != null &&
6b27d407 196 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
197 ) {
198 throw new Error(
199 'Invalid worker choice strategy options: must have a weight for each worker node'
200 )
201 }
0d80593b
JB
202 }
203
a20f0ba5
JB
204 private checkValidTasksQueueOptions (
205 tasksQueueOptions: TasksQueueOptions
206 ): void {
0d80593b
JB
207 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
208 throw new TypeError('Invalid tasks queue options: must be a plain object')
209 }
a20f0ba5
JB
210 if ((tasksQueueOptions?.concurrency as number) <= 0) {
211 throw new Error(
212 `Invalid worker tasks concurrency '${
213 tasksQueueOptions.concurrency as number
214 }'`
215 )
216 }
217 }
218
08f3f44c 219 /** @inheritDoc */
6b27d407
JB
220 public get info (): PoolInfo {
221 return {
222 type: this.type,
184855e6 223 worker: this.worker,
6b27d407
JB
224 minSize: this.minSize,
225 maxSize: this.maxSize,
226 workerNodes: this.workerNodes.length,
227 idleWorkerNodes: this.workerNodes.reduce(
228 (accumulator, workerNode) =>
a4e07f72
JB
229 workerNode.workerUsage.tasks.executing === 0
230 ? accumulator + 1
231 : accumulator,
6b27d407
JB
232 0
233 ),
234 busyWorkerNodes: this.workerNodes.reduce(
235 (accumulator, workerNode) =>
a4e07f72
JB
236 workerNode.workerUsage.tasks.executing > 0
237 ? accumulator + 1
238 : accumulator,
6b27d407
JB
239 0
240 ),
a4e07f72 241 executedTasks: this.workerNodes.reduce(
6b27d407 242 (accumulator, workerNode) =>
a4e07f72
JB
243 accumulator + workerNode.workerUsage.tasks.executed,
244 0
245 ),
246 executingTasks: this.workerNodes.reduce(
247 (accumulator, workerNode) =>
248 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
249 0
250 ),
251 queuedTasks: this.workerNodes.reduce(
252 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
253 0
254 ),
255 maxQueuedTasks: this.workerNodes.reduce(
256 (accumulator, workerNode) =>
257 accumulator + workerNode.tasksQueue.maxSize,
258 0
a4e07f72
JB
259 ),
260 failedTasks: this.workerNodes.reduce(
261 (accumulator, workerNode) =>
262 accumulator + workerNode.workerUsage.tasks.failed,
263 0
6b27d407
JB
264 )
265 }
266 }
08f3f44c 267
8881ae32
JB
268 /**
269 * Pool type.
270 *
271 * If it is `'dynamic'`, it provides the `max` property.
272 */
273 protected abstract get type (): PoolType
274
184855e6
JB
275 /**
276 * Gets the worker type.
277 */
278 protected abstract get worker (): WorkerType
279
c2ade475 280 /**
6b27d407 281 * Pool minimum size.
c2ade475 282 */
6b27d407 283 protected abstract get minSize (): number
ff733df7
JB
284
285 /**
6b27d407 286 * Pool maximum size.
ff733df7 287 */
6b27d407 288 protected abstract get maxSize (): number
a35560ba 289
ffcbbad8 290 /**
f06e48d8 291 * Gets the given worker its worker node key.
ffcbbad8
JB
292 *
293 * @param worker - The worker.
f06e48d8 294 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 295 */
f06e48d8
JB
296 private getWorkerNodeKey (worker: Worker): number {
297 return this.workerNodes.findIndex(
298 workerNode => workerNode.worker === worker
299 )
bf9549ae
JB
300 }
301
afc003b2 302 /** @inheritDoc */
a35560ba 303 public setWorkerChoiceStrategy (
59219cbb
JB
304 workerChoiceStrategy: WorkerChoiceStrategy,
305 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 306 ): void {
aee46736 307 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 308 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
309 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
310 this.opts.workerChoiceStrategy
311 )
312 if (workerChoiceStrategyOptions != null) {
313 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
314 }
0ebe2a9f 315 for (const workerNode of this.workerNodes) {
8604aaab
JB
316 this.setWorkerNodeTasksUsage(
317 workerNode,
318 this.getWorkerUsage(workerNode.worker)
319 )
b6b32453 320 this.setWorkerStatistics(workerNode.worker)
59219cbb 321 }
a20f0ba5
JB
322 }
323
324 /** @inheritDoc */
325 public setWorkerChoiceStrategyOptions (
326 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
327 ): void {
0d80593b 328 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
329 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
330 this.workerChoiceStrategyContext.setOptions(
331 this.opts.workerChoiceStrategyOptions
a35560ba
S
332 )
333 }
334
a20f0ba5 335 /** @inheritDoc */
8f52842f
JB
336 public enableTasksQueue (
337 enable: boolean,
338 tasksQueueOptions?: TasksQueueOptions
339 ): void {
a20f0ba5 340 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 341 this.flushTasksQueues()
a20f0ba5
JB
342 }
343 this.opts.enableTasksQueue = enable
8f52842f 344 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
345 }
346
347 /** @inheritDoc */
8f52842f 348 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 349 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
350 this.checkValidTasksQueueOptions(tasksQueueOptions)
351 this.opts.tasksQueueOptions =
352 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 353 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
354 delete this.opts.tasksQueueOptions
355 }
356 }
357
358 private buildTasksQueueOptions (
359 tasksQueueOptions: TasksQueueOptions
360 ): TasksQueueOptions {
361 return {
362 concurrency: tasksQueueOptions?.concurrency ?? 1
363 }
364 }
365
c319c66b
JB
366 /**
367 * Whether the pool is full or not.
368 *
369 * The pool filling boolean status.
370 */
dea903a8
JB
371 protected get full (): boolean {
372 return this.workerNodes.length >= this.maxSize
373 }
c2ade475 374
c319c66b
JB
375 /**
376 * Whether the pool is busy or not.
377 *
378 * The pool busyness boolean status.
379 */
380 protected abstract get busy (): boolean
7c0ba920 381
6c6afb84
JB
382 /**
383 * Whether worker nodes are executing at least one task.
384 *
385 * @returns Worker nodes busyness boolean status.
386 */
c2ade475 387 protected internalBusy (): boolean {
e0ae6100
JB
388 return (
389 this.workerNodes.findIndex(workerNode => {
a4e07f72 390 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
391 }) === -1
392 )
cb70b19d
JB
393 }
394
afc003b2 395 /** @inheritDoc */
a86b6df1 396 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 397 const timestamp = performance.now()
20dcad1a 398 const workerNodeKey = this.chooseWorkerNode()
adc3c320 399 const submittedTask: Task<Data> = {
a86b6df1 400 name,
e5a5c0fc
JB
401 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
402 data: data ?? ({} as Data),
b6b32453 403 timestamp,
adc3c320
JB
404 id: crypto.randomUUID()
405 }
2e81254d 406 const res = new Promise<Response>((resolve, reject) => {
02706357 407 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
408 resolve,
409 reject,
20dcad1a 410 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
411 })
412 })
ff733df7
JB
413 if (
414 this.opts.enableTasksQueue === true &&
7171d33f 415 (this.busy ||
a4e07f72 416 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 417 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 418 .concurrency as number))
ff733df7 419 ) {
26a929d7
JB
420 this.enqueueTask(workerNodeKey, submittedTask)
421 } else {
2e81254d 422 this.executeTask(workerNodeKey, submittedTask)
adc3c320 423 }
b0d6ed8f 424 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 425 this.checkAndEmitEvents()
78cea37e 426 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
427 return res
428 }
c97c7edb 429
afc003b2 430 /** @inheritDoc */
c97c7edb 431 public async destroy (): Promise<void> {
1fbcaa7c 432 await Promise.all(
875a7c37
JB
433 this.workerNodes.map(async (workerNode, workerNodeKey) => {
434 this.flushTasksQueue(workerNodeKey)
47aacbaa 435 // FIXME: wait for tasks to be finished
f06e48d8 436 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
437 })
438 )
c97c7edb
S
439 }
440
4a6952ff 441 /**
6c6afb84 442 * Terminates the given worker.
4a6952ff 443 *
f06e48d8 444 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
445 */
446 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 447
729c563d 448 /**
2e81254d 449 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 450 * Can be overridden
afc003b2
JB
451 *
452 * @virtual
729c563d 453 */
280c2a77 454 protected setupHook (): void {
d99ba5a8 455 // Intentionally empty
280c2a77 456 }
c97c7edb 457
729c563d 458 /**
280c2a77
S
459 * Should return whether the worker is the main worker or not.
460 */
461 protected abstract isMain (): boolean
462
463 /**
2e81254d 464 * Hook executed before the worker task execution.
bf9549ae 465 * Can be overridden.
729c563d 466 *
f06e48d8 467 * @param workerNodeKey - The worker node key.
1c6fe997 468 * @param task - The task to execute.
729c563d 469 */
1c6fe997
JB
470 protected beforeTaskExecutionHook (
471 workerNodeKey: number,
472 task: Task<Data>
473 ): void {
474 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
475 ++workerUsage.tasks.executing
476 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
477 }
478
c01733f1 479 /**
2e81254d 480 * Hook executed after the worker task execution.
bf9549ae 481 * Can be overridden.
c01733f1 482 *
c923ce56 483 * @param worker - The worker.
38e795c1 484 * @param message - The received message.
c01733f1 485 */
2e81254d 486 protected afterTaskExecutionHook (
c923ce56 487 worker: Worker,
2740a743 488 message: MessageValue<Response>
bf9549ae 489 ): void {
a4e07f72
JB
490 const workerUsage =
491 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
f1c06930
JB
492 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
493 this.updateRunTimeWorkerUsage(workerUsage, message)
494 this.updateEluWorkerUsage(workerUsage, message)
495 }
496
497 private updateTaskStatisticsWorkerUsage (
498 workerUsage: WorkerUsage,
499 message: MessageValue<Response>
500 ): void {
a4e07f72
JB
501 const workerTaskStatistics = workerUsage.tasks
502 --workerTaskStatistics.executing
503 ++workerTaskStatistics.executed
82f36766 504 if (message.taskError != null) {
a4e07f72 505 ++workerTaskStatistics.failed
2740a743 506 }
f8eb0a2a
JB
507 }
508
a4e07f72
JB
509 private updateRunTimeWorkerUsage (
510 workerUsage: WorkerUsage,
f8eb0a2a
JB
511 message: MessageValue<Response>
512 ): void {
87de9ff5
JB
513 if (
514 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 515 .aggregate
87de9ff5 516 ) {
932fc8be 517 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 518 if (
932fc8be
JB
519 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
520 .average &&
a4e07f72 521 workerUsage.tasks.executed !== 0
c6bd2650 522 ) {
a4e07f72 523 workerUsage.runTime.average =
f1c06930
JB
524 workerUsage.runTime.aggregate /
525 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 526 }
3fa4cdd2 527 if (
932fc8be
JB
528 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
529 .median &&
d715b7bc 530 message.taskPerformance?.runTime != null
3fa4cdd2 531 ) {
a4e07f72
JB
532 workerUsage.runTime.history.push(message.taskPerformance.runTime)
533 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 534 }
3032893a 535 }
f8eb0a2a
JB
536 }
537
a4e07f72
JB
538 private updateWaitTimeWorkerUsage (
539 workerUsage: WorkerUsage,
1c6fe997 540 task: Task<Data>
f8eb0a2a 541 ): void {
1c6fe997
JB
542 const timestamp = performance.now()
543 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
544 if (
545 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 546 .aggregate
87de9ff5 547 ) {
932fc8be 548 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 549 if (
87de9ff5 550 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 551 .waitTime.average &&
a4e07f72 552 workerUsage.tasks.executed !== 0
09a6305f 553 ) {
a4e07f72 554 workerUsage.waitTime.average =
f1c06930
JB
555 workerUsage.waitTime.aggregate /
556 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
557 }
558 if (
87de9ff5 559 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 560 .waitTime.median &&
1c6fe997 561 taskWaitTime != null
09a6305f 562 ) {
1c6fe997 563 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 564 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 565 }
0567595a 566 }
c01733f1 567 }
568
a4e07f72 569 private updateEluWorkerUsage (
5df69fab 570 workerUsage: WorkerUsage,
62c15a68
JB
571 message: MessageValue<Response>
572 ): void {
5df69fab
JB
573 if (
574 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
575 .aggregate
576 ) {
577 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
578 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
579 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
580 workerUsage.elu.utilization =
581 (workerUsage.elu.utilization +
582 message.taskPerformance.elu.utilization) /
583 2
584 } else if (message.taskPerformance?.elu != null) {
585 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
586 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
587 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
588 }
d715b7bc 589 if (
5df69fab
JB
590 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
591 .average &&
592 workerUsage.tasks.executed !== 0
593 ) {
f1c06930
JB
594 const executedTasks =
595 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 596 workerUsage.elu.idle.average =
f1c06930 597 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 598 workerUsage.elu.active.average =
f1c06930 599 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
600 }
601 if (
602 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
603 .median &&
d715b7bc
JB
604 message.taskPerformance?.elu != null
605 ) {
5df69fab
JB
606 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
607 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
608 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
609 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
610 }
611 }
612 }
613
280c2a77 614 /**
f06e48d8 615 * Chooses a worker node for the next task.
280c2a77 616 *
6c6afb84 617 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 618 *
20dcad1a 619 * @returns The worker node key
280c2a77 620 */
6c6afb84 621 private chooseWorkerNode (): number {
930dcf12 622 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
623 const worker = this.createAndSetupDynamicWorker()
624 if (
625 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
626 ) {
627 return this.getWorkerNodeKey(worker)
628 }
17393ac8 629 }
930dcf12
JB
630 return this.workerChoiceStrategyContext.execute()
631 }
632
6c6afb84
JB
633 /**
634 * Conditions for dynamic worker creation.
635 *
636 * @returns Whether to create a dynamic worker or not.
637 */
638 private shallCreateDynamicWorker (): boolean {
930dcf12 639 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
640 }
641
280c2a77 642 /**
675bb809 643 * Sends a message to the given worker.
280c2a77 644 *
38e795c1
JB
645 * @param worker - The worker which should receive the message.
646 * @param message - The message.
280c2a77
S
647 */
648 protected abstract sendToWorker (
649 worker: Worker,
650 message: MessageValue<Data>
651 ): void
652
4a6952ff 653 /**
f06e48d8 654 * Registers a listener callback on the given worker.
4a6952ff 655 *
38e795c1
JB
656 * @param worker - The worker which should register a listener.
657 * @param listener - The message listener callback.
4a6952ff
JB
658 */
659 protected abstract registerWorkerMessageListener<
4f7fa42a 660 Message extends Data | Response
78cea37e 661 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 662
729c563d 663 /**
41344292 664 * Creates a new worker.
6c6afb84
JB
665 *
666 * @returns Newly created worker.
729c563d 667 */
280c2a77 668 protected abstract createWorker (): Worker
c97c7edb 669
729c563d 670 /**
f06e48d8 671 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 672 *
38e795c1 673 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 674 *
38e795c1 675 * @param worker - The newly created worker.
729c563d 676 */
280c2a77 677 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 678
4a6952ff 679 /**
f06e48d8 680 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
681 *
682 * @returns New, completely set up worker.
683 */
684 protected createAndSetupWorker (): Worker {
bdacc2d2 685 const worker = this.createWorker()
280c2a77 686
35cf1c03 687 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 688 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
689 worker.on('error', error => {
690 if (this.emitter != null) {
691 this.emitter.emit(PoolEvents.error, error)
692 }
5baee0d7 693 if (this.opts.restartWorkerOnError === true) {
1f68cede 694 this.createAndSetupWorker()
5baee0d7
JB
695 }
696 })
a35560ba
S
697 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
698 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 699 worker.once('exit', () => {
f06e48d8 700 this.removeWorkerNode(worker)
a974afa6 701 })
280c2a77 702
f06e48d8 703 this.pushWorkerNode(worker)
280c2a77 704
b6b32453
JB
705 this.setWorkerStatistics(worker)
706
280c2a77
S
707 this.afterWorkerSetup(worker)
708
c97c7edb
S
709 return worker
710 }
be0676b3 711
930dcf12
JB
712 /**
713 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
714 *
715 * @returns New, completely set up dynamic worker.
716 */
717 protected createAndSetupDynamicWorker (): Worker {
718 const worker = this.createAndSetupWorker()
719 this.registerWorkerMessageListener(worker, message => {
720 const currentWorkerNodeKey = this.getWorkerNodeKey(worker)
721 if (
722 isKillBehavior(KillBehaviors.HARD, message.kill) ||
a6a2dc4c
JB
723 (this.opts.enableTasksQueue === false &&
724 message.kill != null &&
930dcf12 725 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks.executing ===
a6a2dc4c
JB
726 0) ||
727 (this.opts.enableTasksQueue === true &&
728 message.kill != null &&
729 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks.executing ===
730 0 &&
731 this.tasksQueueSize(currentWorkerNodeKey) === 0)
930dcf12
JB
732 ) {
733 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
734 void (this.destroyWorker(worker) as Promise<void>)
735 }
736 })
737 return worker
738 }
739
be0676b3 740 /**
ff733df7 741 * This function is the listener registered for each worker message.
be0676b3 742 *
bdacc2d2 743 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
744 */
745 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 746 return message => {
b1989cfd 747 if (message.id != null) {
a3445496 748 // Task execution response received
2740a743 749 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 750 if (promiseResponse != null) {
82f36766
JB
751 if (message.taskError != null) {
752 promiseResponse.reject(message.taskError.message)
91ee39ed 753 if (this.emitter != null) {
82f36766 754 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 755 }
a05c10de 756 } else {
2740a743 757 promiseResponse.resolve(message.data as Response)
a05c10de 758 }
2e81254d 759 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 760 this.promiseResponseMap.delete(message.id)
ff733df7
JB
761 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
762 if (
763 this.opts.enableTasksQueue === true &&
416fd65c 764 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 765 ) {
2e81254d
JB
766 this.executeTask(
767 workerNodeKey,
ff733df7
JB
768 this.dequeueTask(workerNodeKey) as Task<Data>
769 )
770 }
be0676b3
APA
771 }
772 }
773 }
be0676b3 774 }
7c0ba920 775
ff733df7 776 private checkAndEmitEvents (): void {
1f68cede 777 if (this.emitter != null) {
ff733df7 778 if (this.busy) {
6b27d407 779 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 780 }
6b27d407
JB
781 if (this.type === PoolTypes.dynamic && this.full) {
782 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 783 }
164d950a
JB
784 }
785 }
786
0ebe2a9f
JB
787 /**
788 * Sets the given worker node its tasks usage in the pool.
789 *
790 * @param workerNode - The worker node.
a4e07f72 791 * @param workerUsage - The worker usage.
0ebe2a9f
JB
792 */
793 private setWorkerNodeTasksUsage (
794 workerNode: WorkerNode<Worker, Data>,
a4e07f72 795 workerUsage: WorkerUsage
0ebe2a9f 796 ): void {
a4e07f72 797 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
798 }
799
a05c10de 800 /**
f06e48d8 801 * Pushes the given worker in the pool worker nodes.
ea7a90d3 802 *
38e795c1 803 * @param worker - The worker.
f06e48d8 804 * @returns The worker nodes length.
ea7a90d3 805 */
f06e48d8
JB
806 private pushWorkerNode (worker: Worker): number {
807 return this.workerNodes.push({
ffcbbad8 808 worker,
8604aaab 809 workerUsage: this.getWorkerUsage(worker),
29ee7e9a 810 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
811 })
812 }
c923ce56 813
8604aaab
JB
814 // /**
815 // * Sets the given worker in the pool worker nodes.
816 // *
817 // * @param workerNodeKey - The worker node key.
818 // * @param worker - The worker.
819 // * @param workerUsage - The worker usage.
820 // * @param tasksQueue - The worker task queue.
821 // */
822 // private setWorkerNode (
823 // workerNodeKey: number,
824 // worker: Worker,
825 // workerUsage: WorkerUsage,
826 // tasksQueue: Queue<Task<Data>>
827 // ): void {
828 // this.workerNodes[workerNodeKey] = {
829 // worker,
830 // workerUsage,
831 // tasksQueue
832 // }
833 // }
51fe3d3c
JB
834
835 /**
f06e48d8 836 * Removes the given worker from the pool worker nodes.
51fe3d3c 837 *
f06e48d8 838 * @param worker - The worker.
51fe3d3c 839 */
416fd65c 840 private removeWorkerNode (worker: Worker): void {
f06e48d8 841 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
842 if (workerNodeKey !== -1) {
843 this.workerNodes.splice(workerNodeKey, 1)
844 this.workerChoiceStrategyContext.remove(workerNodeKey)
845 }
51fe3d3c 846 }
adc3c320 847
2e81254d 848 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 849 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
850 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
851 }
852
f9f00b5f 853 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 854 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
855 }
856
416fd65c 857 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 858 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
859 }
860
416fd65c 861 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 862 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 863 }
ff733df7 864
416fd65c
JB
865 private flushTasksQueue (workerNodeKey: number): void {
866 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
867 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
868 this.executeTask(
869 workerNodeKey,
870 this.dequeueTask(workerNodeKey) as Task<Data>
871 )
ff733df7 872 }
ff733df7
JB
873 }
874 }
875
ef41a6e6
JB
876 private flushTasksQueues (): void {
877 for (const [workerNodeKey] of this.workerNodes.entries()) {
878 this.flushTasksQueue(workerNodeKey)
879 }
880 }
b6b32453
JB
881
882 private setWorkerStatistics (worker: Worker): void {
883 this.sendToWorker(worker, {
884 statistics: {
87de9ff5
JB
885 runTime:
886 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 887 .runTime.aggregate,
87de9ff5 888 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 889 .elu.aggregate
b6b32453
JB
890 }
891 })
892 }
8604aaab
JB
893
894 private getWorkerUsage (worker: Worker): WorkerUsage {
895 return {
1c6fe997 896 tasks: this.getTaskStatistics(worker),
8604aaab 897 runTime: {
932fc8be 898 aggregate: 0,
8604aaab
JB
899 average: 0,
900 median: 0,
901 history: new CircularArray()
902 },
903 waitTime: {
932fc8be 904 aggregate: 0,
8604aaab
JB
905 average: 0,
906 median: 0,
907 history: new CircularArray()
908 },
5df69fab
JB
909 elu: {
910 idle: {
911 aggregate: 0,
912 average: 0,
913 median: 0,
914 history: new CircularArray()
915 },
916 active: {
917 aggregate: 0,
918 average: 0,
919 median: 0,
920 history: new CircularArray()
921 },
922 utilization: 0
923 }
8604aaab
JB
924 }
925 }
926
1c6fe997
JB
927 private getTaskStatistics (worker: Worker): TaskStatistics {
928 const queueSize =
929 this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
8604aaab
JB
930 return {
931 executed: 0,
932 executing: 0,
933 get queued (): number {
1c6fe997 934 return queueSize ?? 0
8604aaab
JB
935 },
936 failed: 0
937 }
938 }
c97c7edb 939}