Merge branch 'master' into worker-info
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
afe0d5bf
JB
8 median,
9 round
bbeadd16 10} from '../utils'
34a0cfab 11import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 12import { CircularArray } from '../circular-array'
29ee7e9a 13import { Queue } from '../queue'
c4855468 14import {
65d7a1c9 15 type IPool,
7c5a1080 16 PoolEmitter,
c4855468 17 PoolEvents,
6b27d407 18 type PoolInfo,
c4855468 19 type PoolOptions,
6b27d407
JB
20 type PoolType,
21 PoolTypes,
184855e6 22 type TasksQueueOptions,
83fa0a36
JB
23 type WorkerType,
24 WorkerTypes
c4855468 25} from './pool'
e102732c
JB
26import type {
27 IWorker,
28 MessageHandler,
29 Task,
30 WorkerNode,
31 WorkerUsage
32} from './worker'
a35560ba 33import {
f0d7f803 34 Measurements,
a35560ba 35 WorkerChoiceStrategies,
a20f0ba5
JB
36 type WorkerChoiceStrategy,
37 type WorkerChoiceStrategyOptions
bdaf31cd
JB
38} from './selection-strategies/selection-strategies-types'
39import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 40
729c563d 41/**
ea7a90d3 42 * Base class that implements some shared logic for all poolifier pools.
729c563d 43 *
38e795c1 44 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
45 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
46 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 47 */
c97c7edb 48export abstract class AbstractPool<
f06e48d8 49 Worker extends IWorker,
d3c8a1a8
S
50 Data = unknown,
51 Response = unknown
c4855468 52> implements IPool<Worker, Data, Response> {
afc003b2 53 /** @inheritDoc */
f06e48d8 54 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 55
afc003b2 56 /** @inheritDoc */
7c0ba920
JB
57 public readonly emitter?: PoolEmitter
58
be0676b3 59 /**
a3445496 60 * The execution response promise map.
be0676b3 61 *
2740a743 62 * - `key`: The message id of each submitted task.
a3445496 63 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 64 *
a3445496 65 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 66 */
c923ce56
JB
67 protected promiseResponseMap: Map<
68 string,
69 PromiseResponseWrapper<Worker, Response>
70 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 71
a35560ba 72 /**
51fe3d3c 73 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
74 */
75 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
76 Worker,
77 Data,
78 Response
a35560ba
S
79 >
80
afe0d5bf
JB
81 /**
82 * The start timestamp of the pool.
83 */
84 private readonly startTimestamp
85
729c563d
S
86 /**
87 * Constructs a new poolifier pool.
88 *
38e795c1 89 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 90 * @param filePath - Path to the worker file.
38e795c1 91 * @param opts - Options for the pool.
729c563d 92 */
c97c7edb 93 public constructor (
b4213b7f
JB
94 protected readonly numberOfWorkers: number,
95 protected readonly filePath: string,
96 protected readonly opts: PoolOptions<Worker>
c97c7edb 97 ) {
78cea37e 98 if (!this.isMain()) {
c97c7edb
S
99 throw new Error('Cannot start a pool from a worker!')
100 }
8d3782fa 101 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 102 this.checkFilePath(this.filePath)
7c0ba920 103 this.checkPoolOptions(this.opts)
1086026a 104
7254e419
JB
105 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
106 this.executeTask = this.executeTask.bind(this)
107 this.enqueueTask = this.enqueueTask.bind(this)
108 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 109
6bd72cd0 110 if (this.opts.enableEvents === true) {
7c0ba920
JB
111 this.emitter = new PoolEmitter()
112 }
d59df138
JB
113 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
114 Worker,
115 Data,
116 Response
da309861
JB
117 >(
118 this,
119 this.opts.workerChoiceStrategy,
120 this.opts.workerChoiceStrategyOptions
121 )
b6b32453
JB
122
123 this.setupHook()
124
afe0d5bf 125 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
126 this.createAndSetupWorker()
127 }
afe0d5bf
JB
128
129 this.startTimestamp = performance.now()
c97c7edb
S
130 }
131
a35560ba 132 private checkFilePath (filePath: string): void {
ffcbbad8
JB
133 if (
134 filePath == null ||
135 (typeof filePath === 'string' && filePath.trim().length === 0)
136 ) {
c510fea7
APA
137 throw new Error('Please specify a file with a worker implementation')
138 }
139 }
140
8d3782fa
JB
141 private checkNumberOfWorkers (numberOfWorkers: number): void {
142 if (numberOfWorkers == null) {
143 throw new Error(
144 'Cannot instantiate a pool without specifying the number of workers'
145 )
78cea37e 146 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 147 throw new TypeError(
0d80593b 148 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
149 )
150 } else if (numberOfWorkers < 0) {
473c717a 151 throw new RangeError(
8d3782fa
JB
152 'Cannot instantiate a pool with a negative number of workers'
153 )
6b27d407 154 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
155 throw new Error('Cannot instantiate a fixed pool with no worker')
156 }
157 }
158
7c0ba920 159 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
160 if (isPlainObject(opts)) {
161 this.opts.workerChoiceStrategy =
162 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
163 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
164 this.opts.workerChoiceStrategyOptions =
165 opts.workerChoiceStrategyOptions ??
166 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
167 this.checkValidWorkerChoiceStrategyOptions(
168 this.opts.workerChoiceStrategyOptions
169 )
1f68cede 170 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
171 this.opts.enableEvents = opts.enableEvents ?? true
172 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
173 if (this.opts.enableTasksQueue) {
174 this.checkValidTasksQueueOptions(
175 opts.tasksQueueOptions as TasksQueueOptions
176 )
177 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 }
181 } else {
182 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 183 }
aee46736
JB
184 }
185
186 private checkValidWorkerChoiceStrategy (
187 workerChoiceStrategy: WorkerChoiceStrategy
188 ): void {
189 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 190 throw new Error(
aee46736 191 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
192 )
193 }
7c0ba920
JB
194 }
195
0d80593b
JB
196 private checkValidWorkerChoiceStrategyOptions (
197 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
198 ): void {
199 if (!isPlainObject(workerChoiceStrategyOptions)) {
200 throw new TypeError(
201 'Invalid worker choice strategy options: must be a plain object'
202 )
203 }
49be33fe
JB
204 if (
205 workerChoiceStrategyOptions.weights != null &&
6b27d407 206 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
207 ) {
208 throw new Error(
209 'Invalid worker choice strategy options: must have a weight for each worker node'
210 )
211 }
f0d7f803
JB
212 if (
213 workerChoiceStrategyOptions.measurement != null &&
214 !Object.values(Measurements).includes(
215 workerChoiceStrategyOptions.measurement
216 )
217 ) {
218 throw new Error(
219 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
220 )
221 }
0d80593b
JB
222 }
223
a20f0ba5
JB
224 private checkValidTasksQueueOptions (
225 tasksQueueOptions: TasksQueueOptions
226 ): void {
0d80593b
JB
227 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
228 throw new TypeError('Invalid tasks queue options: must be a plain object')
229 }
f0d7f803
JB
230 if (
231 tasksQueueOptions?.concurrency != null &&
232 !Number.isSafeInteger(tasksQueueOptions.concurrency)
233 ) {
234 throw new TypeError(
235 'Invalid worker tasks concurrency: must be an integer'
236 )
237 }
238 if (
239 tasksQueueOptions?.concurrency != null &&
240 tasksQueueOptions.concurrency <= 0
241 ) {
a20f0ba5 242 throw new Error(
f0d7f803 243 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
244 )
245 }
246 }
247
f59e1027
JB
248 private get starting (): boolean {
249 return this.workerNodes.some(workerNode => !workerNode.info.started)
250 }
251
252 private get started (): boolean {
253 return this.workerNodes.some(workerNode => workerNode.info.started)
254 }
255
08f3f44c 256 /** @inheritDoc */
6b27d407
JB
257 public get info (): PoolInfo {
258 return {
259 type: this.type,
184855e6 260 worker: this.worker,
6b27d407
JB
261 minSize: this.minSize,
262 maxSize: this.maxSize,
afe0d5bf 263 utilization: round(this.utilization),
6b27d407
JB
264 workerNodes: this.workerNodes.length,
265 idleWorkerNodes: this.workerNodes.reduce(
266 (accumulator, workerNode) =>
f59e1027 267 workerNode.usage.tasks.executing === 0
a4e07f72
JB
268 ? accumulator + 1
269 : accumulator,
6b27d407
JB
270 0
271 ),
272 busyWorkerNodes: this.workerNodes.reduce(
273 (accumulator, workerNode) =>
f59e1027 274 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
275 0
276 ),
a4e07f72 277 executedTasks: this.workerNodes.reduce(
6b27d407 278 (accumulator, workerNode) =>
f59e1027 279 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
280 0
281 ),
282 executingTasks: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
f59e1027 284 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
285 0
286 ),
287 queuedTasks: this.workerNodes.reduce(
df593701 288 (accumulator, workerNode) =>
f59e1027 289 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
290 0
291 ),
292 maxQueuedTasks: this.workerNodes.reduce(
293 (accumulator, workerNode) =>
f59e1027 294 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 295 0
a4e07f72
JB
296 ),
297 failedTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
f59e1027 299 accumulator + workerNode.usage.tasks.failed,
a4e07f72 300 0
6b27d407
JB
301 )
302 }
303 }
08f3f44c 304
afe0d5bf
JB
305 /**
306 * Gets the pool run time.
307 *
308 * @returns The pool run time in milliseconds.
309 */
310 private get runTime (): number {
311 return performance.now() - this.startTimestamp
312 }
313
314 /**
315 * Gets the approximate pool utilization.
316 *
317 * @returns The pool utilization.
318 */
319 private get utilization (): number {
320 const poolRunTimeCapacity = this.runTime * this.maxSize
321 const totalTasksRunTime = this.workerNodes.reduce(
322 (accumulator, workerNode) =>
323 accumulator + workerNode.usage.runTime.aggregate,
324 0
325 )
326 const totalTasksWaitTime = this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.waitTime.aggregate,
329 0
330 )
331 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
332 }
333
8881ae32
JB
334 /**
335 * Pool type.
336 *
337 * If it is `'dynamic'`, it provides the `max` property.
338 */
339 protected abstract get type (): PoolType
340
184855e6
JB
341 /**
342 * Gets the worker type.
343 */
344 protected abstract get worker (): WorkerType
345
c2ade475 346 /**
6b27d407 347 * Pool minimum size.
c2ade475 348 */
6b27d407 349 protected abstract get minSize (): number
ff733df7
JB
350
351 /**
6b27d407 352 * Pool maximum size.
ff733df7 353 */
6b27d407 354 protected abstract get maxSize (): number
a35560ba 355
f59e1027
JB
356 /**
357 * Get the worker given its id.
358 *
359 * @param workerId - The worker id.
360 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
361 */
362 private getWorkerById (workerId: number): Worker | undefined {
363 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
364 ?.worker
365 }
366
ffcbbad8 367 /**
f06e48d8 368 * Gets the given worker its worker node key.
ffcbbad8
JB
369 *
370 * @param worker - The worker.
f59e1027 371 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 372 */
f06e48d8
JB
373 private getWorkerNodeKey (worker: Worker): number {
374 return this.workerNodes.findIndex(
375 workerNode => workerNode.worker === worker
376 )
bf9549ae
JB
377 }
378
afc003b2 379 /** @inheritDoc */
a35560ba 380 public setWorkerChoiceStrategy (
59219cbb
JB
381 workerChoiceStrategy: WorkerChoiceStrategy,
382 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 383 ): void {
aee46736 384 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 385 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
386 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
387 this.opts.workerChoiceStrategy
388 )
389 if (workerChoiceStrategyOptions != null) {
390 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
391 }
9c16fb4b 392 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
393 this.setWorkerNodeTasksUsage(
394 workerNode,
9c16fb4b 395 this.getWorkerUsage(workerNodeKey)
8604aaab 396 )
b6b32453 397 this.setWorkerStatistics(workerNode.worker)
59219cbb 398 }
a20f0ba5
JB
399 }
400
401 /** @inheritDoc */
402 public setWorkerChoiceStrategyOptions (
403 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
404 ): void {
0d80593b 405 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
406 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
407 this.workerChoiceStrategyContext.setOptions(
408 this.opts.workerChoiceStrategyOptions
a35560ba
S
409 )
410 }
411
a20f0ba5 412 /** @inheritDoc */
8f52842f
JB
413 public enableTasksQueue (
414 enable: boolean,
415 tasksQueueOptions?: TasksQueueOptions
416 ): void {
a20f0ba5 417 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 418 this.flushTasksQueues()
a20f0ba5
JB
419 }
420 this.opts.enableTasksQueue = enable
8f52842f 421 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
422 }
423
424 /** @inheritDoc */
8f52842f 425 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 426 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
427 this.checkValidTasksQueueOptions(tasksQueueOptions)
428 this.opts.tasksQueueOptions =
429 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 430 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
431 delete this.opts.tasksQueueOptions
432 }
433 }
434
435 private buildTasksQueueOptions (
436 tasksQueueOptions: TasksQueueOptions
437 ): TasksQueueOptions {
438 return {
439 concurrency: tasksQueueOptions?.concurrency ?? 1
440 }
441 }
442
c319c66b
JB
443 /**
444 * Whether the pool is full or not.
445 *
446 * The pool filling boolean status.
447 */
dea903a8
JB
448 protected get full (): boolean {
449 return this.workerNodes.length >= this.maxSize
450 }
c2ade475 451
c319c66b
JB
452 /**
453 * Whether the pool is busy or not.
454 *
455 * The pool busyness boolean status.
456 */
457 protected abstract get busy (): boolean
7c0ba920 458
6c6afb84
JB
459 /**
460 * Whether worker nodes are executing at least one task.
461 *
462 * @returns Worker nodes busyness boolean status.
463 */
c2ade475 464 protected internalBusy (): boolean {
e0ae6100
JB
465 return (
466 this.workerNodes.findIndex(workerNode => {
f59e1027 467 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
468 }) === -1
469 )
cb70b19d
JB
470 }
471
afc003b2 472 /** @inheritDoc */
a86b6df1 473 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 474 const timestamp = performance.now()
20dcad1a 475 const workerNodeKey = this.chooseWorkerNode()
adc3c320 476 const submittedTask: Task<Data> = {
a86b6df1 477 name,
e5a5c0fc
JB
478 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
479 data: data ?? ({} as Data),
b6b32453 480 timestamp,
adc3c320
JB
481 id: crypto.randomUUID()
482 }
2e81254d 483 const res = new Promise<Response>((resolve, reject) => {
02706357 484 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
485 resolve,
486 reject,
20dcad1a 487 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
488 })
489 })
ff733df7
JB
490 if (
491 this.opts.enableTasksQueue === true &&
7171d33f 492 (this.busy ||
f59e1027 493 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 494 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 495 .concurrency as number))
ff733df7 496 ) {
26a929d7
JB
497 this.enqueueTask(workerNodeKey, submittedTask)
498 } else {
2e81254d 499 this.executeTask(workerNodeKey, submittedTask)
adc3c320 500 }
ff733df7 501 this.checkAndEmitEvents()
78cea37e 502 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
503 return res
504 }
c97c7edb 505
afc003b2 506 /** @inheritDoc */
c97c7edb 507 public async destroy (): Promise<void> {
1fbcaa7c 508 await Promise.all(
875a7c37
JB
509 this.workerNodes.map(async (workerNode, workerNodeKey) => {
510 this.flushTasksQueue(workerNodeKey)
47aacbaa 511 // FIXME: wait for tasks to be finished
f06e48d8 512 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
513 })
514 )
c97c7edb
S
515 }
516
4a6952ff 517 /**
6c6afb84 518 * Terminates the given worker.
4a6952ff 519 *
f06e48d8 520 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
521 */
522 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 523
729c563d 524 /**
6677a3d3
JB
525 * Setup hook to execute code before worker nodes are created in the abstract constructor.
526 * Can be overridden.
afc003b2
JB
527 *
528 * @virtual
729c563d 529 */
280c2a77 530 protected setupHook (): void {
d99ba5a8 531 // Intentionally empty
280c2a77 532 }
c97c7edb 533
729c563d 534 /**
280c2a77
S
535 * Should return whether the worker is the main worker or not.
536 */
537 protected abstract isMain (): boolean
538
539 /**
2e81254d 540 * Hook executed before the worker task execution.
bf9549ae 541 * Can be overridden.
729c563d 542 *
f06e48d8 543 * @param workerNodeKey - The worker node key.
1c6fe997 544 * @param task - The task to execute.
729c563d 545 */
1c6fe997
JB
546 protected beforeTaskExecutionHook (
547 workerNodeKey: number,
548 task: Task<Data>
549 ): void {
f59e1027 550 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
551 ++workerUsage.tasks.executing
552 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
553 }
554
c01733f1 555 /**
2e81254d 556 * Hook executed after the worker task execution.
bf9549ae 557 * Can be overridden.
c01733f1 558 *
c923ce56 559 * @param worker - The worker.
38e795c1 560 * @param message - The received message.
c01733f1 561 */
2e81254d 562 protected afterTaskExecutionHook (
c923ce56 563 worker: Worker,
2740a743 564 message: MessageValue<Response>
bf9549ae 565 ): void {
f59e1027 566 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
567 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
568 this.updateRunTimeWorkerUsage(workerUsage, message)
569 this.updateEluWorkerUsage(workerUsage, message)
570 }
571
572 private updateTaskStatisticsWorkerUsage (
573 workerUsage: WorkerUsage,
574 message: MessageValue<Response>
575 ): void {
a4e07f72
JB
576 const workerTaskStatistics = workerUsage.tasks
577 --workerTaskStatistics.executing
578 ++workerTaskStatistics.executed
82f36766 579 if (message.taskError != null) {
a4e07f72 580 ++workerTaskStatistics.failed
2740a743 581 }
f8eb0a2a
JB
582 }
583
a4e07f72
JB
584 private updateRunTimeWorkerUsage (
585 workerUsage: WorkerUsage,
f8eb0a2a
JB
586 message: MessageValue<Response>
587 ): void {
87de9ff5
JB
588 if (
589 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 590 .aggregate
87de9ff5 591 ) {
932fc8be 592 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 593 if (
932fc8be
JB
594 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
595 .average &&
a4e07f72 596 workerUsage.tasks.executed !== 0
c6bd2650 597 ) {
a4e07f72 598 workerUsage.runTime.average =
f1c06930
JB
599 workerUsage.runTime.aggregate /
600 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 601 }
3fa4cdd2 602 if (
932fc8be
JB
603 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
604 .median &&
d715b7bc 605 message.taskPerformance?.runTime != null
3fa4cdd2 606 ) {
a4e07f72
JB
607 workerUsage.runTime.history.push(message.taskPerformance.runTime)
608 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 609 }
3032893a 610 }
f8eb0a2a
JB
611 }
612
a4e07f72
JB
613 private updateWaitTimeWorkerUsage (
614 workerUsage: WorkerUsage,
1c6fe997 615 task: Task<Data>
f8eb0a2a 616 ): void {
1c6fe997
JB
617 const timestamp = performance.now()
618 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
619 if (
620 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 621 .aggregate
87de9ff5 622 ) {
932fc8be 623 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 624 if (
87de9ff5 625 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 626 .waitTime.average &&
a4e07f72 627 workerUsage.tasks.executed !== 0
09a6305f 628 ) {
a4e07f72 629 workerUsage.waitTime.average =
f1c06930
JB
630 workerUsage.waitTime.aggregate /
631 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
632 }
633 if (
87de9ff5 634 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 635 .waitTime.median &&
1c6fe997 636 taskWaitTime != null
09a6305f 637 ) {
1c6fe997 638 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 639 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 640 }
0567595a 641 }
c01733f1 642 }
643
a4e07f72 644 private updateEluWorkerUsage (
5df69fab 645 workerUsage: WorkerUsage,
62c15a68
JB
646 message: MessageValue<Response>
647 ): void {
5df69fab
JB
648 if (
649 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
650 .aggregate
651 ) {
652 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
653 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
654 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
655 workerUsage.elu.utilization =
656 (workerUsage.elu.utilization +
657 message.taskPerformance.elu.utilization) /
658 2
659 } else if (message.taskPerformance?.elu != null) {
660 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
661 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
662 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
663 }
d715b7bc 664 if (
5df69fab
JB
665 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
666 .average &&
667 workerUsage.tasks.executed !== 0
668 ) {
f1c06930
JB
669 const executedTasks =
670 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 671 workerUsage.elu.idle.average =
f1c06930 672 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 673 workerUsage.elu.active.average =
f1c06930 674 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
675 }
676 if (
677 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
678 .median &&
d715b7bc
JB
679 message.taskPerformance?.elu != null
680 ) {
5df69fab
JB
681 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
682 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
683 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
684 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
685 }
686 }
687 }
688
280c2a77 689 /**
f06e48d8 690 * Chooses a worker node for the next task.
280c2a77 691 *
6c6afb84 692 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 693 *
20dcad1a 694 * @returns The worker node key
280c2a77 695 */
6c6afb84 696 private chooseWorkerNode (): number {
930dcf12 697 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
698 const worker = this.createAndSetupDynamicWorker()
699 if (
700 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
701 ) {
702 return this.getWorkerNodeKey(worker)
703 }
17393ac8 704 }
930dcf12
JB
705 return this.workerChoiceStrategyContext.execute()
706 }
707
6c6afb84
JB
708 /**
709 * Conditions for dynamic worker creation.
710 *
711 * @returns Whether to create a dynamic worker or not.
712 */
713 private shallCreateDynamicWorker (): boolean {
930dcf12 714 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
715 }
716
280c2a77 717 /**
675bb809 718 * Sends a message to the given worker.
280c2a77 719 *
38e795c1
JB
720 * @param worker - The worker which should receive the message.
721 * @param message - The message.
280c2a77
S
722 */
723 protected abstract sendToWorker (
724 worker: Worker,
725 message: MessageValue<Data>
726 ): void
727
4a6952ff 728 /**
f06e48d8 729 * Registers a listener callback on the given worker.
4a6952ff 730 *
38e795c1
JB
731 * @param worker - The worker which should register a listener.
732 * @param listener - The message listener callback.
4a6952ff 733 */
e102732c
JB
734 private registerWorkerMessageListener<Message extends Data | Response>(
735 worker: Worker,
736 listener: (message: MessageValue<Message>) => void
737 ): void {
738 worker.on('message', listener as MessageHandler<Worker>)
739 }
c97c7edb 740
729c563d 741 /**
41344292 742 * Creates a new worker.
6c6afb84
JB
743 *
744 * @returns Newly created worker.
729c563d 745 */
280c2a77 746 protected abstract createWorker (): Worker
c97c7edb 747
729c563d 748 /**
f06e48d8 749 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 750 * Can be overridden.
729c563d 751 *
38e795c1 752 * @param worker - The newly created worker.
729c563d 753 */
6677a3d3 754 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
755 // Listen to worker messages.
756 this.registerWorkerMessageListener(worker, this.workerListener())
757 }
c97c7edb 758
4a6952ff 759 /**
f06e48d8 760 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
761 *
762 * @returns New, completely set up worker.
763 */
764 protected createAndSetupWorker (): Worker {
bdacc2d2 765 const worker = this.createWorker()
280c2a77 766
35cf1c03 767 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 768 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
769 worker.on('error', error => {
770 if (this.emitter != null) {
771 this.emitter.emit(PoolEvents.error, error)
772 }
f59e1027 773 if (this.opts.restartWorkerOnError === true && !this.starting) {
1f68cede 774 this.createAndSetupWorker()
5baee0d7
JB
775 }
776 })
a35560ba
S
777 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
778 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 779 worker.once('exit', () => {
f06e48d8 780 this.removeWorkerNode(worker)
a974afa6 781 })
280c2a77 782
f06e48d8 783 this.pushWorkerNode(worker)
280c2a77 784
b6b32453
JB
785 this.setWorkerStatistics(worker)
786
280c2a77
S
787 this.afterWorkerSetup(worker)
788
c97c7edb
S
789 return worker
790 }
be0676b3 791
930dcf12
JB
792 /**
793 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
794 *
795 * @returns New, completely set up dynamic worker.
796 */
797 protected createAndSetupDynamicWorker (): Worker {
798 const worker = this.createAndSetupWorker()
799 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 800 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
801 if (
802 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
803 (message.kill != null &&
804 ((this.opts.enableTasksQueue === false &&
f59e1027 805 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 806 (this.opts.enableTasksQueue === true &&
f59e1027 807 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 808 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
809 ) {
810 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
811 void (this.destroyWorker(worker) as Promise<void>)
812 }
813 })
814 return worker
815 }
816
be0676b3 817 /**
ff733df7 818 * This function is the listener registered for each worker message.
be0676b3 819 *
bdacc2d2 820 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
821 */
822 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 823 return message => {
f59e1027
JB
824 if (message.workerId != null && message.started != null) {
825 // Worker started message received
83fa0a36
JB
826 const worker = this.getWorkerById(message.workerId)
827 if (worker != null) {
828 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
829 message.started
830 } else {
831 throw new Error('Worker started message received from unknown worker')
832 }
f59e1027 833 } else if (message.id != null) {
a3445496 834 // Task execution response received
2740a743 835 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 836 if (promiseResponse != null) {
82f36766 837 if (message.taskError != null) {
91ee39ed 838 if (this.emitter != null) {
82f36766 839 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 840 }
cd9580e7 841 promiseResponse.reject(message.taskError.message)
a05c10de 842 } else {
2740a743 843 promiseResponse.resolve(message.data as Response)
a05c10de 844 }
2e81254d 845 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 846 this.promiseResponseMap.delete(message.id)
ff733df7
JB
847 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
848 if (
849 this.opts.enableTasksQueue === true &&
416fd65c 850 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 851 ) {
2e81254d
JB
852 this.executeTask(
853 workerNodeKey,
ff733df7
JB
854 this.dequeueTask(workerNodeKey) as Task<Data>
855 )
856 }
e5536a06 857 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
858 }
859 }
860 }
be0676b3 861 }
7c0ba920 862
ff733df7 863 private checkAndEmitEvents (): void {
1f68cede 864 if (this.emitter != null) {
ff733df7 865 if (this.busy) {
6b27d407 866 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 867 }
6b27d407
JB
868 if (this.type === PoolTypes.dynamic && this.full) {
869 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 870 }
164d950a
JB
871 }
872 }
873
0ebe2a9f
JB
874 /**
875 * Sets the given worker node its tasks usage in the pool.
876 *
877 * @param workerNode - The worker node.
a4e07f72 878 * @param workerUsage - The worker usage.
0ebe2a9f
JB
879 */
880 private setWorkerNodeTasksUsage (
881 workerNode: WorkerNode<Worker, Data>,
a4e07f72 882 workerUsage: WorkerUsage
0ebe2a9f 883 ): void {
f59e1027 884 workerNode.usage = workerUsage
0ebe2a9f
JB
885 }
886
a05c10de 887 /**
f06e48d8 888 * Pushes the given worker in the pool worker nodes.
ea7a90d3 889 *
38e795c1 890 * @param worker - The worker.
f06e48d8 891 * @returns The worker nodes length.
ea7a90d3 892 */
f06e48d8 893 private pushWorkerNode (worker: Worker): number {
9c16fb4b 894 this.workerNodes.push({
ffcbbad8 895 worker,
83fa0a36 896 info: { id: this.getWorkerId(worker), started: false },
f59e1027 897 usage: this.getWorkerUsage(),
29ee7e9a 898 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 899 })
9c16fb4b
JB
900 const workerNodeKey = this.getWorkerNodeKey(worker)
901 this.setWorkerNodeTasksUsage(
902 this.workerNodes[workerNodeKey],
903 this.getWorkerUsage(workerNodeKey)
904 )
905 return this.workerNodes.length
ea7a90d3 906 }
c923ce56 907
83fa0a36
JB
908 /**
909 * Gets the worker id.
910 *
911 * @param worker - The worker.
912 * @returns The worker id.
913 */
914 private getWorkerId (worker: Worker): number | undefined {
915 if (this.worker === WorkerTypes.thread) {
916 return worker.threadId
917 } else if (this.worker === WorkerTypes.cluster) {
918 return worker.id
919 }
920 }
921
8604aaab
JB
922 // /**
923 // * Sets the given worker in the pool worker nodes.
924 // *
925 // * @param workerNodeKey - The worker node key.
926 // * @param worker - The worker.
f59e1027 927 // * @param workerInfo - The worker info.
8604aaab
JB
928 // * @param workerUsage - The worker usage.
929 // * @param tasksQueue - The worker task queue.
930 // */
931 // private setWorkerNode (
932 // workerNodeKey: number,
933 // worker: Worker,
f59e1027 934 // workerInfo: WorkerInfo,
8604aaab
JB
935 // workerUsage: WorkerUsage,
936 // tasksQueue: Queue<Task<Data>>
937 // ): void {
938 // this.workerNodes[workerNodeKey] = {
939 // worker,
f59e1027
JB
940 // info: workerInfo,
941 // usage: workerUsage,
8604aaab
JB
942 // tasksQueue
943 // }
944 // }
51fe3d3c
JB
945
946 /**
f06e48d8 947 * Removes the given worker from the pool worker nodes.
51fe3d3c 948 *
f06e48d8 949 * @param worker - The worker.
51fe3d3c 950 */
416fd65c 951 private removeWorkerNode (worker: Worker): void {
f06e48d8 952 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
953 if (workerNodeKey !== -1) {
954 this.workerNodes.splice(workerNodeKey, 1)
955 this.workerChoiceStrategyContext.remove(workerNodeKey)
956 }
51fe3d3c 957 }
adc3c320 958
2e81254d 959 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 960 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
961 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
962 }
963
f9f00b5f 964 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 965 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
966 }
967
416fd65c 968 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 969 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
970 }
971
416fd65c 972 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 973 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 974 }
ff733df7 975
df593701
JB
976 private tasksMaxQueueSize (workerNodeKey: number): number {
977 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
978 }
979
416fd65c
JB
980 private flushTasksQueue (workerNodeKey: number): void {
981 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
982 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
983 this.executeTask(
984 workerNodeKey,
985 this.dequeueTask(workerNodeKey) as Task<Data>
986 )
ff733df7 987 }
ff733df7 988 }
df593701 989 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
990 }
991
ef41a6e6
JB
992 private flushTasksQueues (): void {
993 for (const [workerNodeKey] of this.workerNodes.entries()) {
994 this.flushTasksQueue(workerNodeKey)
995 }
996 }
b6b32453
JB
997
998 private setWorkerStatistics (worker: Worker): void {
999 this.sendToWorker(worker, {
1000 statistics: {
87de9ff5
JB
1001 runTime:
1002 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1003 .runTime.aggregate,
87de9ff5 1004 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1005 .elu.aggregate
b6b32453
JB
1006 }
1007 })
1008 }
8604aaab 1009
9c16fb4b 1010 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1011 const getTasksQueueSize = (workerNodeKey?: number): number => {
1012 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1013 }
df593701
JB
1014 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1015 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1016 }
8604aaab 1017 return {
9c16fb4b
JB
1018 tasks: {
1019 executed: 0,
1020 executing: 0,
1021 get queued (): number {
e3347a5c 1022 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1023 },
df593701
JB
1024 get maxQueued (): number {
1025 return getTasksMaxQueueSize(workerNodeKey)
1026 },
9c16fb4b
JB
1027 failed: 0
1028 },
8604aaab 1029 runTime: {
932fc8be 1030 aggregate: 0,
8604aaab
JB
1031 average: 0,
1032 median: 0,
1033 history: new CircularArray()
1034 },
1035 waitTime: {
932fc8be 1036 aggregate: 0,
8604aaab
JB
1037 average: 0,
1038 median: 0,
1039 history: new CircularArray()
1040 },
5df69fab
JB
1041 elu: {
1042 idle: {
1043 aggregate: 0,
1044 average: 0,
1045 median: 0,
1046 history: new CircularArray()
1047 },
1048 active: {
1049 aggregate: 0,
1050 average: 0,
1051 median: 0,
1052 history: new CircularArray()
1053 },
1054 utilization: 0
1055 }
8604aaab
JB
1056 }
1057 }
c97c7edb 1058}