docs: refine worker choice strategies documentation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
8604aaab
JB
24import type {
25 IWorker,
26 Task,
27 TaskStatistics,
28 WorkerNode,
29 WorkerUsage
30} from './worker'
a35560ba
S
31import {
32 WorkerChoiceStrategies,
a20f0ba5
JB
33 type WorkerChoiceStrategy,
34 type WorkerChoiceStrategyOptions
bdaf31cd
JB
35} from './selection-strategies/selection-strategies-types'
36import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 37
729c563d 38/**
ea7a90d3 39 * Base class that implements some shared logic for all poolifier pools.
729c563d 40 *
38e795c1
JB
41 * @typeParam Worker - Type of worker which manages this pool.
42 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 43 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 44 */
c97c7edb 45export abstract class AbstractPool<
f06e48d8 46 Worker extends IWorker,
d3c8a1a8
S
47 Data = unknown,
48 Response = unknown
c4855468 49> implements IPool<Worker, Data, Response> {
afc003b2 50 /** @inheritDoc */
f06e48d8 51 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 52
afc003b2 53 /** @inheritDoc */
7c0ba920
JB
54 public readonly emitter?: PoolEmitter
55
be0676b3 56 /**
a3445496 57 * The execution response promise map.
be0676b3 58 *
2740a743 59 * - `key`: The message id of each submitted task.
a3445496 60 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 61 *
a3445496 62 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 63 */
c923ce56
JB
64 protected promiseResponseMap: Map<
65 string,
66 PromiseResponseWrapper<Worker, Response>
67 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 68
a35560ba 69 /**
51fe3d3c 70 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
71 */
72 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
73 Worker,
74 Data,
75 Response
a35560ba
S
76 >
77
729c563d
S
78 /**
79 * Constructs a new poolifier pool.
80 *
38e795c1 81 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 82 * @param filePath - Path to the worker file.
38e795c1 83 * @param opts - Options for the pool.
729c563d 84 */
c97c7edb 85 public constructor (
b4213b7f
JB
86 protected readonly numberOfWorkers: number,
87 protected readonly filePath: string,
88 protected readonly opts: PoolOptions<Worker>
c97c7edb 89 ) {
78cea37e 90 if (!this.isMain()) {
c97c7edb
S
91 throw new Error('Cannot start a pool from a worker!')
92 }
8d3782fa 93 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 94 this.checkFilePath(this.filePath)
7c0ba920 95 this.checkPoolOptions(this.opts)
1086026a 96
7254e419
JB
97 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
98 this.executeTask = this.executeTask.bind(this)
99 this.enqueueTask = this.enqueueTask.bind(this)
100 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 101
6bd72cd0 102 if (this.opts.enableEvents === true) {
7c0ba920
JB
103 this.emitter = new PoolEmitter()
104 }
d59df138
JB
105 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
106 Worker,
107 Data,
108 Response
da309861
JB
109 >(
110 this,
111 this.opts.workerChoiceStrategy,
112 this.opts.workerChoiceStrategyOptions
113 )
b6b32453
JB
114
115 this.setupHook()
116
117 for (let i = 1; i <= this.numberOfWorkers; i++) {
118 this.createAndSetupWorker()
119 }
c97c7edb
S
120 }
121
a35560ba 122 private checkFilePath (filePath: string): void {
ffcbbad8
JB
123 if (
124 filePath == null ||
125 (typeof filePath === 'string' && filePath.trim().length === 0)
126 ) {
c510fea7
APA
127 throw new Error('Please specify a file with a worker implementation')
128 }
129 }
130
8d3782fa
JB
131 private checkNumberOfWorkers (numberOfWorkers: number): void {
132 if (numberOfWorkers == null) {
133 throw new Error(
134 'Cannot instantiate a pool without specifying the number of workers'
135 )
78cea37e 136 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 137 throw new TypeError(
0d80593b 138 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
139 )
140 } else if (numberOfWorkers < 0) {
473c717a 141 throw new RangeError(
8d3782fa
JB
142 'Cannot instantiate a pool with a negative number of workers'
143 )
6b27d407 144 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
145 throw new Error('Cannot instantiate a fixed pool with no worker')
146 }
147 }
148
7c0ba920 149 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
150 if (isPlainObject(opts)) {
151 this.opts.workerChoiceStrategy =
152 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
153 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
154 this.opts.workerChoiceStrategyOptions =
155 opts.workerChoiceStrategyOptions ??
156 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
157 this.checkValidWorkerChoiceStrategyOptions(
158 this.opts.workerChoiceStrategyOptions
159 )
1f68cede 160 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
161 this.opts.enableEvents = opts.enableEvents ?? true
162 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
163 if (this.opts.enableTasksQueue) {
164 this.checkValidTasksQueueOptions(
165 opts.tasksQueueOptions as TasksQueueOptions
166 )
167 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
168 opts.tasksQueueOptions as TasksQueueOptions
169 )
170 }
171 } else {
172 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 173 }
aee46736
JB
174 }
175
176 private checkValidWorkerChoiceStrategy (
177 workerChoiceStrategy: WorkerChoiceStrategy
178 ): void {
179 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 180 throw new Error(
aee46736 181 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
182 )
183 }
7c0ba920
JB
184 }
185
0d80593b
JB
186 private checkValidWorkerChoiceStrategyOptions (
187 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
188 ): void {
189 if (!isPlainObject(workerChoiceStrategyOptions)) {
190 throw new TypeError(
191 'Invalid worker choice strategy options: must be a plain object'
192 )
193 }
49be33fe
JB
194 if (
195 workerChoiceStrategyOptions.weights != null &&
6b27d407 196 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
197 ) {
198 throw new Error(
199 'Invalid worker choice strategy options: must have a weight for each worker node'
200 )
201 }
0d80593b
JB
202 }
203
a20f0ba5
JB
204 private checkValidTasksQueueOptions (
205 tasksQueueOptions: TasksQueueOptions
206 ): void {
0d80593b
JB
207 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
208 throw new TypeError('Invalid tasks queue options: must be a plain object')
209 }
a20f0ba5
JB
210 if ((tasksQueueOptions?.concurrency as number) <= 0) {
211 throw new Error(
212 `Invalid worker tasks concurrency '${
213 tasksQueueOptions.concurrency as number
214 }'`
215 )
216 }
217 }
218
08f3f44c 219 /** @inheritDoc */
6b27d407
JB
220 public get info (): PoolInfo {
221 return {
222 type: this.type,
184855e6 223 worker: this.worker,
6b27d407
JB
224 minSize: this.minSize,
225 maxSize: this.maxSize,
226 workerNodes: this.workerNodes.length,
227 idleWorkerNodes: this.workerNodes.reduce(
228 (accumulator, workerNode) =>
a4e07f72
JB
229 workerNode.workerUsage.tasks.executing === 0
230 ? accumulator + 1
231 : accumulator,
6b27d407
JB
232 0
233 ),
234 busyWorkerNodes: this.workerNodes.reduce(
235 (accumulator, workerNode) =>
a4e07f72
JB
236 workerNode.workerUsage.tasks.executing > 0
237 ? accumulator + 1
238 : accumulator,
6b27d407
JB
239 0
240 ),
a4e07f72 241 executedTasks: this.workerNodes.reduce(
6b27d407 242 (accumulator, workerNode) =>
a4e07f72
JB
243 accumulator + workerNode.workerUsage.tasks.executed,
244 0
245 ),
246 executingTasks: this.workerNodes.reduce(
247 (accumulator, workerNode) =>
248 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
249 0
250 ),
251 queuedTasks: this.workerNodes.reduce(
252 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
253 0
254 ),
255 maxQueuedTasks: this.workerNodes.reduce(
256 (accumulator, workerNode) =>
257 accumulator + workerNode.tasksQueue.maxSize,
258 0
a4e07f72
JB
259 ),
260 failedTasks: this.workerNodes.reduce(
261 (accumulator, workerNode) =>
262 accumulator + workerNode.workerUsage.tasks.failed,
263 0
6b27d407
JB
264 )
265 }
266 }
08f3f44c 267
8881ae32
JB
268 /**
269 * Pool type.
270 *
271 * If it is `'dynamic'`, it provides the `max` property.
272 */
273 protected abstract get type (): PoolType
274
184855e6
JB
275 /**
276 * Gets the worker type.
277 */
278 protected abstract get worker (): WorkerType
279
c2ade475 280 /**
6b27d407 281 * Pool minimum size.
c2ade475 282 */
6b27d407 283 protected abstract get minSize (): number
ff733df7
JB
284
285 /**
6b27d407 286 * Pool maximum size.
ff733df7 287 */
6b27d407 288 protected abstract get maxSize (): number
a35560ba 289
ffcbbad8 290 /**
f06e48d8 291 * Gets the given worker its worker node key.
ffcbbad8
JB
292 *
293 * @param worker - The worker.
f06e48d8 294 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 295 */
f06e48d8
JB
296 private getWorkerNodeKey (worker: Worker): number {
297 return this.workerNodes.findIndex(
298 workerNode => workerNode.worker === worker
299 )
bf9549ae
JB
300 }
301
afc003b2 302 /** @inheritDoc */
a35560ba 303 public setWorkerChoiceStrategy (
59219cbb
JB
304 workerChoiceStrategy: WorkerChoiceStrategy,
305 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 306 ): void {
aee46736 307 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 308 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
309 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
310 this.opts.workerChoiceStrategy
311 )
312 if (workerChoiceStrategyOptions != null) {
313 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
314 }
0ebe2a9f 315 for (const workerNode of this.workerNodes) {
8604aaab
JB
316 this.setWorkerNodeTasksUsage(
317 workerNode,
318 this.getWorkerUsage(workerNode.worker)
319 )
b6b32453 320 this.setWorkerStatistics(workerNode.worker)
59219cbb 321 }
a20f0ba5
JB
322 }
323
324 /** @inheritDoc */
325 public setWorkerChoiceStrategyOptions (
326 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
327 ): void {
0d80593b 328 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
329 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
330 this.workerChoiceStrategyContext.setOptions(
331 this.opts.workerChoiceStrategyOptions
a35560ba
S
332 )
333 }
334
a20f0ba5 335 /** @inheritDoc */
8f52842f
JB
336 public enableTasksQueue (
337 enable: boolean,
338 tasksQueueOptions?: TasksQueueOptions
339 ): void {
a20f0ba5 340 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 341 this.flushTasksQueues()
a20f0ba5
JB
342 }
343 this.opts.enableTasksQueue = enable
8f52842f 344 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
345 }
346
347 /** @inheritDoc */
8f52842f 348 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 349 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
350 this.checkValidTasksQueueOptions(tasksQueueOptions)
351 this.opts.tasksQueueOptions =
352 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 353 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
354 delete this.opts.tasksQueueOptions
355 }
356 }
357
358 private buildTasksQueueOptions (
359 tasksQueueOptions: TasksQueueOptions
360 ): TasksQueueOptions {
361 return {
362 concurrency: tasksQueueOptions?.concurrency ?? 1
363 }
364 }
365
c319c66b
JB
366 /**
367 * Whether the pool is full or not.
368 *
369 * The pool filling boolean status.
370 */
dea903a8
JB
371 protected get full (): boolean {
372 return this.workerNodes.length >= this.maxSize
373 }
c2ade475 374
c319c66b
JB
375 /**
376 * Whether the pool is busy or not.
377 *
378 * The pool busyness boolean status.
379 */
380 protected abstract get busy (): boolean
7c0ba920 381
c2ade475 382 protected internalBusy (): boolean {
e0ae6100
JB
383 return (
384 this.workerNodes.findIndex(workerNode => {
a4e07f72 385 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
386 }) === -1
387 )
cb70b19d
JB
388 }
389
afc003b2 390 /** @inheritDoc */
a86b6df1 391 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 392 const timestamp = performance.now()
20dcad1a 393 const workerNodeKey = this.chooseWorkerNode()
adc3c320 394 const submittedTask: Task<Data> = {
a86b6df1 395 name,
e5a5c0fc
JB
396 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
397 data: data ?? ({} as Data),
b6b32453 398 timestamp,
adc3c320
JB
399 id: crypto.randomUUID()
400 }
2e81254d 401 const res = new Promise<Response>((resolve, reject) => {
02706357 402 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
403 resolve,
404 reject,
20dcad1a 405 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
406 })
407 })
ff733df7
JB
408 if (
409 this.opts.enableTasksQueue === true &&
7171d33f 410 (this.busy ||
a4e07f72 411 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 412 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 413 .concurrency as number))
ff733df7 414 ) {
26a929d7
JB
415 this.enqueueTask(workerNodeKey, submittedTask)
416 } else {
2e81254d 417 this.executeTask(workerNodeKey, submittedTask)
adc3c320 418 }
b0d6ed8f 419 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 420 this.checkAndEmitEvents()
78cea37e 421 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
422 return res
423 }
c97c7edb 424
afc003b2 425 /** @inheritDoc */
c97c7edb 426 public async destroy (): Promise<void> {
1fbcaa7c 427 await Promise.all(
875a7c37
JB
428 this.workerNodes.map(async (workerNode, workerNodeKey) => {
429 this.flushTasksQueue(workerNodeKey)
47aacbaa 430 // FIXME: wait for tasks to be finished
f06e48d8 431 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
432 })
433 )
c97c7edb
S
434 }
435
4a6952ff 436 /**
f06e48d8 437 * Shutdowns the given worker.
4a6952ff 438 *
f06e48d8 439 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
440 */
441 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 442
729c563d 443 /**
2e81254d 444 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 445 * Can be overridden
afc003b2
JB
446 *
447 * @virtual
729c563d 448 */
280c2a77 449 protected setupHook (): void {
d99ba5a8 450 // Intentionally empty
280c2a77 451 }
c97c7edb 452
729c563d 453 /**
280c2a77
S
454 * Should return whether the worker is the main worker or not.
455 */
456 protected abstract isMain (): boolean
457
458 /**
2e81254d 459 * Hook executed before the worker task execution.
bf9549ae 460 * Can be overridden.
729c563d 461 *
f06e48d8 462 * @param workerNodeKey - The worker node key.
1c6fe997 463 * @param task - The task to execute.
729c563d 464 */
1c6fe997
JB
465 protected beforeTaskExecutionHook (
466 workerNodeKey: number,
467 task: Task<Data>
468 ): void {
469 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
470 ++workerUsage.tasks.executing
471 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
472 }
473
c01733f1 474 /**
2e81254d 475 * Hook executed after the worker task execution.
bf9549ae 476 * Can be overridden.
c01733f1 477 *
c923ce56 478 * @param worker - The worker.
38e795c1 479 * @param message - The received message.
c01733f1 480 */
2e81254d 481 protected afterTaskExecutionHook (
c923ce56 482 worker: Worker,
2740a743 483 message: MessageValue<Response>
bf9549ae 484 ): void {
a4e07f72
JB
485 const workerUsage =
486 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
487 const workerTaskStatistics = workerUsage.tasks
488 --workerTaskStatistics.executing
489 ++workerTaskStatistics.executed
82f36766 490 if (message.taskError != null) {
a4e07f72 491 ++workerTaskStatistics.failed
2740a743 492 }
a4e07f72 493 this.updateRunTimeWorkerUsage(workerUsage, message)
a4e07f72 494 this.updateEluWorkerUsage(workerUsage, message)
f8eb0a2a
JB
495 }
496
a4e07f72
JB
497 private updateRunTimeWorkerUsage (
498 workerUsage: WorkerUsage,
f8eb0a2a
JB
499 message: MessageValue<Response>
500 ): void {
87de9ff5
JB
501 if (
502 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 503 .aggregate
87de9ff5 504 ) {
932fc8be 505 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 506 if (
932fc8be
JB
507 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
508 .average &&
a4e07f72 509 workerUsage.tasks.executed !== 0
c6bd2650 510 ) {
a4e07f72 511 workerUsage.runTime.average =
932fc8be 512 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 513 }
3fa4cdd2 514 if (
932fc8be
JB
515 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
516 .median &&
d715b7bc 517 message.taskPerformance?.runTime != null
3fa4cdd2 518 ) {
a4e07f72
JB
519 workerUsage.runTime.history.push(message.taskPerformance.runTime)
520 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 521 }
3032893a 522 }
f8eb0a2a
JB
523 }
524
a4e07f72
JB
525 private updateWaitTimeWorkerUsage (
526 workerUsage: WorkerUsage,
1c6fe997 527 task: Task<Data>
f8eb0a2a 528 ): void {
1c6fe997
JB
529 const timestamp = performance.now()
530 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
531 if (
532 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 533 .aggregate
87de9ff5 534 ) {
932fc8be 535 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 536 if (
87de9ff5 537 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 538 .waitTime.average &&
a4e07f72 539 workerUsage.tasks.executed !== 0
09a6305f 540 ) {
a4e07f72 541 workerUsage.waitTime.average =
932fc8be 542 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
543 }
544 if (
87de9ff5 545 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 546 .waitTime.median &&
1c6fe997 547 taskWaitTime != null
09a6305f 548 ) {
1c6fe997 549 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 550 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 551 }
0567595a 552 }
c01733f1 553 }
554
a4e07f72 555 private updateEluWorkerUsage (
5df69fab 556 workerUsage: WorkerUsage,
62c15a68
JB
557 message: MessageValue<Response>
558 ): void {
5df69fab
JB
559 if (
560 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
561 .aggregate
562 ) {
563 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
564 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
565 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
566 workerUsage.elu.utilization =
567 (workerUsage.elu.utilization +
568 message.taskPerformance.elu.utilization) /
569 2
570 } else if (message.taskPerformance?.elu != null) {
571 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
572 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
573 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
574 }
d715b7bc 575 if (
5df69fab
JB
576 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
577 .average &&
578 workerUsage.tasks.executed !== 0
579 ) {
580 workerUsage.elu.idle.average =
581 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
582 workerUsage.elu.active.average =
583 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
584 }
585 if (
586 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
587 .median &&
d715b7bc
JB
588 message.taskPerformance?.elu != null
589 ) {
5df69fab
JB
590 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
591 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
592 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
593 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
594 }
595 }
596 }
597
280c2a77 598 /**
f06e48d8 599 * Chooses a worker node for the next task.
280c2a77 600 *
20dcad1a 601 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 602 *
20dcad1a 603 * @returns The worker node key
280c2a77 604 */
20dcad1a 605 protected chooseWorkerNode (): number {
f06e48d8 606 let workerNodeKey: number
6b27d407 607 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
608 const workerCreated = this.createAndSetupWorker()
609 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 610 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
611 if (
612 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 613 (message.kill != null &&
a4e07f72
JB
614 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
615 .executing === 0)
17393ac8 616 ) {
ff733df7 617 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 618 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 619 // FIXME: wait for tasks to be finished
7c5a1080 620 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
621 }
622 })
adc3c320 623 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 624 } else {
f06e48d8 625 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 626 }
20dcad1a 627 return workerNodeKey
c97c7edb
S
628 }
629
280c2a77 630 /**
675bb809 631 * Sends a message to the given worker.
280c2a77 632 *
38e795c1
JB
633 * @param worker - The worker which should receive the message.
634 * @param message - The message.
280c2a77
S
635 */
636 protected abstract sendToWorker (
637 worker: Worker,
638 message: MessageValue<Data>
639 ): void
640
4a6952ff 641 /**
f06e48d8 642 * Registers a listener callback on the given worker.
4a6952ff 643 *
38e795c1
JB
644 * @param worker - The worker which should register a listener.
645 * @param listener - The message listener callback.
4a6952ff
JB
646 */
647 protected abstract registerWorkerMessageListener<
4f7fa42a 648 Message extends Data | Response
78cea37e 649 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 650
729c563d
S
651 /**
652 * Returns a newly created worker.
653 */
280c2a77 654 protected abstract createWorker (): Worker
c97c7edb 655
729c563d 656 /**
f06e48d8 657 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 658 *
38e795c1 659 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 660 *
38e795c1 661 * @param worker - The newly created worker.
729c563d 662 */
280c2a77 663 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 664
4a6952ff 665 /**
f06e48d8 666 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
667 *
668 * @returns New, completely set up worker.
669 */
670 protected createAndSetupWorker (): Worker {
bdacc2d2 671 const worker = this.createWorker()
280c2a77 672
35cf1c03 673 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 674 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
675 worker.on('error', error => {
676 if (this.emitter != null) {
677 this.emitter.emit(PoolEvents.error, error)
678 }
679 })
5baee0d7
JB
680 worker.on('error', () => {
681 if (this.opts.restartWorkerOnError === true) {
1f68cede 682 this.createAndSetupWorker()
5baee0d7
JB
683 }
684 })
a35560ba
S
685 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
686 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 687 worker.once('exit', () => {
f06e48d8 688 this.removeWorkerNode(worker)
a974afa6 689 })
280c2a77 690
f06e48d8 691 this.pushWorkerNode(worker)
280c2a77 692
b6b32453
JB
693 this.setWorkerStatistics(worker)
694
280c2a77
S
695 this.afterWorkerSetup(worker)
696
c97c7edb
S
697 return worker
698 }
be0676b3
APA
699
700 /**
ff733df7 701 * This function is the listener registered for each worker message.
be0676b3 702 *
bdacc2d2 703 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
704 */
705 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 706 return message => {
b1989cfd 707 if (message.id != null) {
a3445496 708 // Task execution response received
2740a743 709 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 710 if (promiseResponse != null) {
82f36766
JB
711 if (message.taskError != null) {
712 promiseResponse.reject(message.taskError.message)
91ee39ed 713 if (this.emitter != null) {
82f36766 714 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 715 }
a05c10de 716 } else {
2740a743 717 promiseResponse.resolve(message.data as Response)
a05c10de 718 }
2e81254d 719 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 720 this.promiseResponseMap.delete(message.id)
ff733df7
JB
721 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
722 if (
723 this.opts.enableTasksQueue === true &&
416fd65c 724 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 725 ) {
2e81254d
JB
726 this.executeTask(
727 workerNodeKey,
ff733df7
JB
728 this.dequeueTask(workerNodeKey) as Task<Data>
729 )
730 }
be0676b3
APA
731 }
732 }
733 }
be0676b3 734 }
7c0ba920 735
ff733df7 736 private checkAndEmitEvents (): void {
1f68cede 737 if (this.emitter != null) {
ff733df7 738 if (this.busy) {
6b27d407 739 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 740 }
6b27d407
JB
741 if (this.type === PoolTypes.dynamic && this.full) {
742 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 743 }
164d950a
JB
744 }
745 }
746
0ebe2a9f
JB
747 /**
748 * Sets the given worker node its tasks usage in the pool.
749 *
750 * @param workerNode - The worker node.
a4e07f72 751 * @param workerUsage - The worker usage.
0ebe2a9f
JB
752 */
753 private setWorkerNodeTasksUsage (
754 workerNode: WorkerNode<Worker, Data>,
a4e07f72 755 workerUsage: WorkerUsage
0ebe2a9f 756 ): void {
a4e07f72 757 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
758 }
759
a05c10de 760 /**
f06e48d8 761 * Pushes the given worker in the pool worker nodes.
ea7a90d3 762 *
38e795c1 763 * @param worker - The worker.
f06e48d8 764 * @returns The worker nodes length.
ea7a90d3 765 */
f06e48d8
JB
766 private pushWorkerNode (worker: Worker): number {
767 return this.workerNodes.push({
ffcbbad8 768 worker,
8604aaab 769 workerUsage: this.getWorkerUsage(worker),
29ee7e9a 770 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
771 })
772 }
c923ce56 773
8604aaab
JB
774 // /**
775 // * Sets the given worker in the pool worker nodes.
776 // *
777 // * @param workerNodeKey - The worker node key.
778 // * @param worker - The worker.
779 // * @param workerUsage - The worker usage.
780 // * @param tasksQueue - The worker task queue.
781 // */
782 // private setWorkerNode (
783 // workerNodeKey: number,
784 // worker: Worker,
785 // workerUsage: WorkerUsage,
786 // tasksQueue: Queue<Task<Data>>
787 // ): void {
788 // this.workerNodes[workerNodeKey] = {
789 // worker,
790 // workerUsage,
791 // tasksQueue
792 // }
793 // }
51fe3d3c
JB
794
795 /**
f06e48d8 796 * Removes the given worker from the pool worker nodes.
51fe3d3c 797 *
f06e48d8 798 * @param worker - The worker.
51fe3d3c 799 */
416fd65c 800 private removeWorkerNode (worker: Worker): void {
f06e48d8 801 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
802 if (workerNodeKey !== -1) {
803 this.workerNodes.splice(workerNodeKey, 1)
804 this.workerChoiceStrategyContext.remove(workerNodeKey)
805 }
51fe3d3c 806 }
adc3c320 807
2e81254d 808 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 809 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
810 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
811 }
812
f9f00b5f 813 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 814 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
815 }
816
416fd65c 817 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 818 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
819 }
820
416fd65c 821 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 822 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 823 }
ff733df7 824
416fd65c
JB
825 private flushTasksQueue (workerNodeKey: number): void {
826 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
827 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
828 this.executeTask(
829 workerNodeKey,
830 this.dequeueTask(workerNodeKey) as Task<Data>
831 )
ff733df7 832 }
ff733df7
JB
833 }
834 }
835
ef41a6e6
JB
836 private flushTasksQueues (): void {
837 for (const [workerNodeKey] of this.workerNodes.entries()) {
838 this.flushTasksQueue(workerNodeKey)
839 }
840 }
b6b32453
JB
841
842 private setWorkerStatistics (worker: Worker): void {
843 this.sendToWorker(worker, {
844 statistics: {
87de9ff5
JB
845 runTime:
846 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 847 .runTime.aggregate,
87de9ff5 848 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 849 .elu.aggregate
b6b32453
JB
850 }
851 })
852 }
8604aaab
JB
853
854 private getWorkerUsage (worker: Worker): WorkerUsage {
855 return {
1c6fe997 856 tasks: this.getTaskStatistics(worker),
8604aaab 857 runTime: {
932fc8be 858 aggregate: 0,
8604aaab
JB
859 average: 0,
860 median: 0,
861 history: new CircularArray()
862 },
863 waitTime: {
932fc8be 864 aggregate: 0,
8604aaab
JB
865 average: 0,
866 median: 0,
867 history: new CircularArray()
868 },
5df69fab
JB
869 elu: {
870 idle: {
871 aggregate: 0,
872 average: 0,
873 median: 0,
874 history: new CircularArray()
875 },
876 active: {
877 aggregate: 0,
878 average: 0,
879 median: 0,
880 history: new CircularArray()
881 },
882 utilization: 0
883 }
8604aaab
JB
884 }
885 }
886
1c6fe997
JB
887 private getTaskStatistics (worker: Worker): TaskStatistics {
888 const queueSize =
889 this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
8604aaab
JB
890 return {
891 executed: 0,
892 executing: 0,
893 get queued (): number {
1c6fe997 894 return queueSize ?? 0
8604aaab
JB
895 },
896 failed: 0
897 }
898 }
c97c7edb 899}