feat: add worker info to worker nodes
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
e102732c
JB
24import type {
25 IWorker,
26 MessageHandler,
27 Task,
28 WorkerNode,
29 WorkerUsage
30} from './worker'
a35560ba 31import {
f0d7f803 32 Measurements,
a35560ba 33 WorkerChoiceStrategies,
a20f0ba5
JB
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
bdaf31cd
JB
36} from './selection-strategies/selection-strategies-types'
37import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 38
729c563d 39/**
ea7a90d3 40 * Base class that implements some shared logic for all poolifier pools.
729c563d 41 *
38e795c1 42 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
43 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
44 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 45 */
c97c7edb 46export abstract class AbstractPool<
f06e48d8 47 Worker extends IWorker,
d3c8a1a8
S
48 Data = unknown,
49 Response = unknown
c4855468 50> implements IPool<Worker, Data, Response> {
afc003b2 51 /** @inheritDoc */
f06e48d8 52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 53
afc003b2 54 /** @inheritDoc */
7c0ba920
JB
55 public readonly emitter?: PoolEmitter
56
be0676b3 57 /**
a3445496 58 * The execution response promise map.
be0676b3 59 *
2740a743 60 * - `key`: The message id of each submitted task.
a3445496 61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 62 *
a3445496 63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 64 */
c923ce56
JB
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 69
a35560ba 70 /**
51fe3d3c 71 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
74 Worker,
75 Data,
76 Response
a35560ba
S
77 >
78
729c563d
S
79 /**
80 * Constructs a new poolifier pool.
81 *
38e795c1 82 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 83 * @param filePath - Path to the worker file.
38e795c1 84 * @param opts - Options for the pool.
729c563d 85 */
c97c7edb 86 public constructor (
b4213b7f
JB
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
c97c7edb 90 ) {
78cea37e 91 if (!this.isMain()) {
c97c7edb
S
92 throw new Error('Cannot start a pool from a worker!')
93 }
8d3782fa 94 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 95 this.checkFilePath(this.filePath)
7c0ba920 96 this.checkPoolOptions(this.opts)
1086026a 97
7254e419
JB
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
b6b32453
JB
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
c97c7edb
S
121 }
122
a35560ba 123 private checkFilePath (filePath: string): void {
ffcbbad8
JB
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
c510fea7
APA
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
8d3782fa
JB
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
78cea37e 137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 138 throw new TypeError(
0d80593b 139 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
140 )
141 } else if (numberOfWorkers < 0) {
473c717a 142 throw new RangeError(
8d3782fa
JB
143 'Cannot instantiate a pool with a negative number of workers'
144 )
6b27d407 145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
7c0ba920 150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
1f68cede 161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 174 }
aee46736
JB
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 181 throw new Error(
aee46736 182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
183 )
184 }
7c0ba920
JB
185 }
186
0d80593b
JB
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
49be33fe
JB
195 if (
196 workerChoiceStrategyOptions.weights != null &&
6b27d407 197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
f0d7f803
JB
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
0d80593b
JB
213 }
214
a20f0ba5
JB
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
0d80593b
JB
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
f0d7f803
JB
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
a20f0ba5 233 throw new Error(
f0d7f803 234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
235 )
236 }
237 }
238
f59e1027
JB
239 private get starting (): boolean {
240 return this.workerNodes.some(workerNode => !workerNode.info.started)
241 }
242
243 private get started (): boolean {
244 return this.workerNodes.some(workerNode => workerNode.info.started)
245 }
246
08f3f44c 247 /** @inheritDoc */
6b27d407
JB
248 public get info (): PoolInfo {
249 return {
250 type: this.type,
184855e6 251 worker: this.worker,
6b27d407
JB
252 minSize: this.minSize,
253 maxSize: this.maxSize,
254 workerNodes: this.workerNodes.length,
255 idleWorkerNodes: this.workerNodes.reduce(
256 (accumulator, workerNode) =>
f59e1027 257 workerNode.usage.tasks.executing === 0
a4e07f72
JB
258 ? accumulator + 1
259 : accumulator,
6b27d407
JB
260 0
261 ),
262 busyWorkerNodes: this.workerNodes.reduce(
263 (accumulator, workerNode) =>
f59e1027 264 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
265 0
266 ),
a4e07f72 267 executedTasks: this.workerNodes.reduce(
6b27d407 268 (accumulator, workerNode) =>
f59e1027 269 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
270 0
271 ),
272 executingTasks: this.workerNodes.reduce(
273 (accumulator, workerNode) =>
f59e1027 274 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
275 0
276 ),
277 queuedTasks: this.workerNodes.reduce(
df593701 278 (accumulator, workerNode) =>
f59e1027 279 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
280 0
281 ),
282 maxQueuedTasks: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
f59e1027 284 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 285 0
a4e07f72
JB
286 ),
287 failedTasks: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
f59e1027 289 accumulator + workerNode.usage.tasks.failed,
a4e07f72 290 0
6b27d407
JB
291 )
292 }
293 }
08f3f44c 294
8881ae32
JB
295 /**
296 * Pool type.
297 *
298 * If it is `'dynamic'`, it provides the `max` property.
299 */
300 protected abstract get type (): PoolType
301
184855e6
JB
302 /**
303 * Gets the worker type.
304 */
305 protected abstract get worker (): WorkerType
306
c2ade475 307 /**
6b27d407 308 * Pool minimum size.
c2ade475 309 */
6b27d407 310 protected abstract get minSize (): number
ff733df7
JB
311
312 /**
6b27d407 313 * Pool maximum size.
ff733df7 314 */
6b27d407 315 protected abstract get maxSize (): number
a35560ba 316
f59e1027
JB
317 /**
318 * Get the worker given its id.
319 *
320 * @param workerId - The worker id.
321 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
322 */
323 private getWorkerById (workerId: number): Worker | undefined {
324 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
325 ?.worker
326 }
327
ffcbbad8 328 /**
f06e48d8 329 * Gets the given worker its worker node key.
ffcbbad8
JB
330 *
331 * @param worker - The worker.
f59e1027 332 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 333 */
f06e48d8
JB
334 private getWorkerNodeKey (worker: Worker): number {
335 return this.workerNodes.findIndex(
336 workerNode => workerNode.worker === worker
337 )
bf9549ae
JB
338 }
339
afc003b2 340 /** @inheritDoc */
a35560ba 341 public setWorkerChoiceStrategy (
59219cbb
JB
342 workerChoiceStrategy: WorkerChoiceStrategy,
343 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 344 ): void {
aee46736 345 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 346 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
347 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
348 this.opts.workerChoiceStrategy
349 )
350 if (workerChoiceStrategyOptions != null) {
351 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
352 }
9c16fb4b 353 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
354 this.setWorkerNodeTasksUsage(
355 workerNode,
9c16fb4b 356 this.getWorkerUsage(workerNodeKey)
8604aaab 357 )
b6b32453 358 this.setWorkerStatistics(workerNode.worker)
59219cbb 359 }
a20f0ba5
JB
360 }
361
362 /** @inheritDoc */
363 public setWorkerChoiceStrategyOptions (
364 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
365 ): void {
0d80593b 366 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
367 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
368 this.workerChoiceStrategyContext.setOptions(
369 this.opts.workerChoiceStrategyOptions
a35560ba
S
370 )
371 }
372
a20f0ba5 373 /** @inheritDoc */
8f52842f
JB
374 public enableTasksQueue (
375 enable: boolean,
376 tasksQueueOptions?: TasksQueueOptions
377 ): void {
a20f0ba5 378 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 379 this.flushTasksQueues()
a20f0ba5
JB
380 }
381 this.opts.enableTasksQueue = enable
8f52842f 382 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
383 }
384
385 /** @inheritDoc */
8f52842f 386 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 387 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
388 this.checkValidTasksQueueOptions(tasksQueueOptions)
389 this.opts.tasksQueueOptions =
390 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 391 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
392 delete this.opts.tasksQueueOptions
393 }
394 }
395
396 private buildTasksQueueOptions (
397 tasksQueueOptions: TasksQueueOptions
398 ): TasksQueueOptions {
399 return {
400 concurrency: tasksQueueOptions?.concurrency ?? 1
401 }
402 }
403
c319c66b
JB
404 /**
405 * Whether the pool is full or not.
406 *
407 * The pool filling boolean status.
408 */
dea903a8
JB
409 protected get full (): boolean {
410 return this.workerNodes.length >= this.maxSize
411 }
c2ade475 412
c319c66b
JB
413 /**
414 * Whether the pool is busy or not.
415 *
416 * The pool busyness boolean status.
417 */
418 protected abstract get busy (): boolean
7c0ba920 419
6c6afb84
JB
420 /**
421 * Whether worker nodes are executing at least one task.
422 *
423 * @returns Worker nodes busyness boolean status.
424 */
c2ade475 425 protected internalBusy (): boolean {
e0ae6100
JB
426 return (
427 this.workerNodes.findIndex(workerNode => {
f59e1027 428 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
429 }) === -1
430 )
cb70b19d
JB
431 }
432
afc003b2 433 /** @inheritDoc */
a86b6df1 434 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 435 const timestamp = performance.now()
20dcad1a 436 const workerNodeKey = this.chooseWorkerNode()
adc3c320 437 const submittedTask: Task<Data> = {
a86b6df1 438 name,
e5a5c0fc
JB
439 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
440 data: data ?? ({} as Data),
b6b32453 441 timestamp,
adc3c320
JB
442 id: crypto.randomUUID()
443 }
2e81254d 444 const res = new Promise<Response>((resolve, reject) => {
02706357 445 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
446 resolve,
447 reject,
20dcad1a 448 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
449 })
450 })
ff733df7
JB
451 if (
452 this.opts.enableTasksQueue === true &&
7171d33f 453 (this.busy ||
f59e1027 454 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 455 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 456 .concurrency as number))
ff733df7 457 ) {
26a929d7
JB
458 this.enqueueTask(workerNodeKey, submittedTask)
459 } else {
2e81254d 460 this.executeTask(workerNodeKey, submittedTask)
adc3c320 461 }
ff733df7 462 this.checkAndEmitEvents()
78cea37e 463 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
464 return res
465 }
c97c7edb 466
afc003b2 467 /** @inheritDoc */
c97c7edb 468 public async destroy (): Promise<void> {
1fbcaa7c 469 await Promise.all(
875a7c37
JB
470 this.workerNodes.map(async (workerNode, workerNodeKey) => {
471 this.flushTasksQueue(workerNodeKey)
47aacbaa 472 // FIXME: wait for tasks to be finished
f06e48d8 473 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
474 })
475 )
c97c7edb
S
476 }
477
4a6952ff 478 /**
6c6afb84 479 * Terminates the given worker.
4a6952ff 480 *
f06e48d8 481 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
482 */
483 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 484
729c563d 485 /**
6677a3d3
JB
486 * Setup hook to execute code before worker nodes are created in the abstract constructor.
487 * Can be overridden.
afc003b2
JB
488 *
489 * @virtual
729c563d 490 */
280c2a77 491 protected setupHook (): void {
d99ba5a8 492 // Intentionally empty
280c2a77 493 }
c97c7edb 494
729c563d 495 /**
280c2a77
S
496 * Should return whether the worker is the main worker or not.
497 */
498 protected abstract isMain (): boolean
499
500 /**
2e81254d 501 * Hook executed before the worker task execution.
bf9549ae 502 * Can be overridden.
729c563d 503 *
f06e48d8 504 * @param workerNodeKey - The worker node key.
1c6fe997 505 * @param task - The task to execute.
729c563d 506 */
1c6fe997
JB
507 protected beforeTaskExecutionHook (
508 workerNodeKey: number,
509 task: Task<Data>
510 ): void {
f59e1027 511 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
512 ++workerUsage.tasks.executing
513 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
514 }
515
c01733f1 516 /**
2e81254d 517 * Hook executed after the worker task execution.
bf9549ae 518 * Can be overridden.
c01733f1 519 *
c923ce56 520 * @param worker - The worker.
38e795c1 521 * @param message - The received message.
c01733f1 522 */
2e81254d 523 protected afterTaskExecutionHook (
c923ce56 524 worker: Worker,
2740a743 525 message: MessageValue<Response>
bf9549ae 526 ): void {
f59e1027 527 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
528 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
529 this.updateRunTimeWorkerUsage(workerUsage, message)
530 this.updateEluWorkerUsage(workerUsage, message)
531 }
532
533 private updateTaskStatisticsWorkerUsage (
534 workerUsage: WorkerUsage,
535 message: MessageValue<Response>
536 ): void {
a4e07f72
JB
537 const workerTaskStatistics = workerUsage.tasks
538 --workerTaskStatistics.executing
539 ++workerTaskStatistics.executed
82f36766 540 if (message.taskError != null) {
a4e07f72 541 ++workerTaskStatistics.failed
2740a743 542 }
f8eb0a2a
JB
543 }
544
a4e07f72
JB
545 private updateRunTimeWorkerUsage (
546 workerUsage: WorkerUsage,
f8eb0a2a
JB
547 message: MessageValue<Response>
548 ): void {
87de9ff5
JB
549 if (
550 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 551 .aggregate
87de9ff5 552 ) {
932fc8be 553 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 554 if (
932fc8be
JB
555 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
556 .average &&
a4e07f72 557 workerUsage.tasks.executed !== 0
c6bd2650 558 ) {
a4e07f72 559 workerUsage.runTime.average =
f1c06930
JB
560 workerUsage.runTime.aggregate /
561 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 562 }
3fa4cdd2 563 if (
932fc8be
JB
564 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
565 .median &&
d715b7bc 566 message.taskPerformance?.runTime != null
3fa4cdd2 567 ) {
a4e07f72
JB
568 workerUsage.runTime.history.push(message.taskPerformance.runTime)
569 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 570 }
3032893a 571 }
f8eb0a2a
JB
572 }
573
a4e07f72
JB
574 private updateWaitTimeWorkerUsage (
575 workerUsage: WorkerUsage,
1c6fe997 576 task: Task<Data>
f8eb0a2a 577 ): void {
1c6fe997
JB
578 const timestamp = performance.now()
579 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
580 if (
581 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 582 .aggregate
87de9ff5 583 ) {
932fc8be 584 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 585 if (
87de9ff5 586 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 587 .waitTime.average &&
a4e07f72 588 workerUsage.tasks.executed !== 0
09a6305f 589 ) {
a4e07f72 590 workerUsage.waitTime.average =
f1c06930
JB
591 workerUsage.waitTime.aggregate /
592 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
593 }
594 if (
87de9ff5 595 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 596 .waitTime.median &&
1c6fe997 597 taskWaitTime != null
09a6305f 598 ) {
1c6fe997 599 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 600 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 601 }
0567595a 602 }
c01733f1 603 }
604
a4e07f72 605 private updateEluWorkerUsage (
5df69fab 606 workerUsage: WorkerUsage,
62c15a68
JB
607 message: MessageValue<Response>
608 ): void {
5df69fab
JB
609 if (
610 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
611 .aggregate
612 ) {
613 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
614 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
615 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
616 workerUsage.elu.utilization =
617 (workerUsage.elu.utilization +
618 message.taskPerformance.elu.utilization) /
619 2
620 } else if (message.taskPerformance?.elu != null) {
621 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
622 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
623 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
624 }
d715b7bc 625 if (
5df69fab
JB
626 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
627 .average &&
628 workerUsage.tasks.executed !== 0
629 ) {
f1c06930
JB
630 const executedTasks =
631 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 632 workerUsage.elu.idle.average =
f1c06930 633 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 634 workerUsage.elu.active.average =
f1c06930 635 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
636 }
637 if (
638 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
639 .median &&
d715b7bc
JB
640 message.taskPerformance?.elu != null
641 ) {
5df69fab
JB
642 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
643 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
644 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
645 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
646 }
647 }
648 }
649
280c2a77 650 /**
f06e48d8 651 * Chooses a worker node for the next task.
280c2a77 652 *
6c6afb84 653 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 654 *
20dcad1a 655 * @returns The worker node key
280c2a77 656 */
6c6afb84 657 private chooseWorkerNode (): number {
930dcf12 658 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
659 const worker = this.createAndSetupDynamicWorker()
660 if (
661 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
662 ) {
663 return this.getWorkerNodeKey(worker)
664 }
17393ac8 665 }
930dcf12
JB
666 return this.workerChoiceStrategyContext.execute()
667 }
668
6c6afb84
JB
669 /**
670 * Conditions for dynamic worker creation.
671 *
672 * @returns Whether to create a dynamic worker or not.
673 */
674 private shallCreateDynamicWorker (): boolean {
930dcf12 675 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
676 }
677
280c2a77 678 /**
675bb809 679 * Sends a message to the given worker.
280c2a77 680 *
38e795c1
JB
681 * @param worker - The worker which should receive the message.
682 * @param message - The message.
280c2a77
S
683 */
684 protected abstract sendToWorker (
685 worker: Worker,
686 message: MessageValue<Data>
687 ): void
688
4a6952ff 689 /**
f06e48d8 690 * Registers a listener callback on the given worker.
4a6952ff 691 *
38e795c1
JB
692 * @param worker - The worker which should register a listener.
693 * @param listener - The message listener callback.
4a6952ff 694 */
e102732c
JB
695 private registerWorkerMessageListener<Message extends Data | Response>(
696 worker: Worker,
697 listener: (message: MessageValue<Message>) => void
698 ): void {
699 worker.on('message', listener as MessageHandler<Worker>)
700 }
c97c7edb 701
729c563d 702 /**
41344292 703 * Creates a new worker.
6c6afb84
JB
704 *
705 * @returns Newly created worker.
729c563d 706 */
280c2a77 707 protected abstract createWorker (): Worker
c97c7edb 708
729c563d 709 /**
f06e48d8 710 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 711 * Can be overridden.
729c563d 712 *
38e795c1 713 * @param worker - The newly created worker.
729c563d 714 */
6677a3d3 715 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
716 // Listen to worker messages.
717 this.registerWorkerMessageListener(worker, this.workerListener())
718 }
c97c7edb 719
4a6952ff 720 /**
f06e48d8 721 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
722 *
723 * @returns New, completely set up worker.
724 */
725 protected createAndSetupWorker (): Worker {
bdacc2d2 726 const worker = this.createWorker()
280c2a77 727
35cf1c03 728 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 729 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
730 worker.on('error', error => {
731 if (this.emitter != null) {
732 this.emitter.emit(PoolEvents.error, error)
733 }
f59e1027 734 if (this.opts.restartWorkerOnError === true && !this.starting) {
1f68cede 735 this.createAndSetupWorker()
5baee0d7
JB
736 }
737 })
a35560ba
S
738 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
739 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 740 worker.once('exit', () => {
f06e48d8 741 this.removeWorkerNode(worker)
a974afa6 742 })
280c2a77 743
f06e48d8 744 this.pushWorkerNode(worker)
280c2a77 745
b6b32453
JB
746 this.setWorkerStatistics(worker)
747
280c2a77
S
748 this.afterWorkerSetup(worker)
749
c97c7edb
S
750 return worker
751 }
be0676b3 752
930dcf12
JB
753 /**
754 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
755 *
756 * @returns New, completely set up dynamic worker.
757 */
758 protected createAndSetupDynamicWorker (): Worker {
759 const worker = this.createAndSetupWorker()
760 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 761 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
762 if (
763 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
764 (message.kill != null &&
765 ((this.opts.enableTasksQueue === false &&
f59e1027 766 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 767 (this.opts.enableTasksQueue === true &&
f59e1027 768 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 769 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
770 ) {
771 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
772 void (this.destroyWorker(worker) as Promise<void>)
773 }
774 })
775 return worker
776 }
777
be0676b3 778 /**
ff733df7 779 * This function is the listener registered for each worker message.
be0676b3 780 *
bdacc2d2 781 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
782 */
783 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 784 return message => {
f59e1027
JB
785 if (message.workerId != null && message.started != null) {
786 // Worker started message received
787 this.workerNodes[
788 this.getWorkerNodeKey(this.getWorkerById(message.workerId) as Worker)
789 ].info.started = message.started
790 } else if (message.id != null) {
a3445496 791 // Task execution response received
2740a743 792 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 793 if (promiseResponse != null) {
82f36766 794 if (message.taskError != null) {
91ee39ed 795 if (this.emitter != null) {
82f36766 796 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 797 }
cd9580e7 798 promiseResponse.reject(message.taskError.message)
a05c10de 799 } else {
2740a743 800 promiseResponse.resolve(message.data as Response)
a05c10de 801 }
2e81254d 802 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 803 this.promiseResponseMap.delete(message.id)
ff733df7
JB
804 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
805 if (
806 this.opts.enableTasksQueue === true &&
416fd65c 807 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 808 ) {
2e81254d
JB
809 this.executeTask(
810 workerNodeKey,
ff733df7
JB
811 this.dequeueTask(workerNodeKey) as Task<Data>
812 )
813 }
e5536a06 814 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
815 }
816 }
817 }
be0676b3 818 }
7c0ba920 819
ff733df7 820 private checkAndEmitEvents (): void {
1f68cede 821 if (this.emitter != null) {
ff733df7 822 if (this.busy) {
6b27d407 823 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 824 }
6b27d407
JB
825 if (this.type === PoolTypes.dynamic && this.full) {
826 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 827 }
164d950a
JB
828 }
829 }
830
0ebe2a9f
JB
831 /**
832 * Sets the given worker node its tasks usage in the pool.
833 *
834 * @param workerNode - The worker node.
a4e07f72 835 * @param workerUsage - The worker usage.
0ebe2a9f
JB
836 */
837 private setWorkerNodeTasksUsage (
838 workerNode: WorkerNode<Worker, Data>,
a4e07f72 839 workerUsage: WorkerUsage
0ebe2a9f 840 ): void {
f59e1027 841 workerNode.usage = workerUsage
0ebe2a9f
JB
842 }
843
a05c10de 844 /**
f06e48d8 845 * Pushes the given worker in the pool worker nodes.
ea7a90d3 846 *
38e795c1 847 * @param worker - The worker.
f06e48d8 848 * @returns The worker nodes length.
ea7a90d3 849 */
f06e48d8 850 private pushWorkerNode (worker: Worker): number {
9c16fb4b 851 this.workerNodes.push({
ffcbbad8 852 worker,
f59e1027
JB
853 info: { id: worker.threadId ?? worker.id, started: false },
854 usage: this.getWorkerUsage(),
29ee7e9a 855 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 856 })
9c16fb4b
JB
857 const workerNodeKey = this.getWorkerNodeKey(worker)
858 this.setWorkerNodeTasksUsage(
859 this.workerNodes[workerNodeKey],
860 this.getWorkerUsage(workerNodeKey)
861 )
862 return this.workerNodes.length
ea7a90d3 863 }
c923ce56 864
8604aaab
JB
865 // /**
866 // * Sets the given worker in the pool worker nodes.
867 // *
868 // * @param workerNodeKey - The worker node key.
869 // * @param worker - The worker.
f59e1027 870 // * @param workerInfo - The worker info.
8604aaab
JB
871 // * @param workerUsage - The worker usage.
872 // * @param tasksQueue - The worker task queue.
873 // */
874 // private setWorkerNode (
875 // workerNodeKey: number,
876 // worker: Worker,
f59e1027 877 // workerInfo: WorkerInfo,
8604aaab
JB
878 // workerUsage: WorkerUsage,
879 // tasksQueue: Queue<Task<Data>>
880 // ): void {
881 // this.workerNodes[workerNodeKey] = {
882 // worker,
f59e1027
JB
883 // info: workerInfo,
884 // usage: workerUsage,
8604aaab
JB
885 // tasksQueue
886 // }
887 // }
51fe3d3c
JB
888
889 /**
f06e48d8 890 * Removes the given worker from the pool worker nodes.
51fe3d3c 891 *
f06e48d8 892 * @param worker - The worker.
51fe3d3c 893 */
416fd65c 894 private removeWorkerNode (worker: Worker): void {
f06e48d8 895 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
896 if (workerNodeKey !== -1) {
897 this.workerNodes.splice(workerNodeKey, 1)
898 this.workerChoiceStrategyContext.remove(workerNodeKey)
899 }
51fe3d3c 900 }
adc3c320 901
2e81254d 902 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 903 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
904 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
905 }
906
f9f00b5f 907 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 908 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
909 }
910
416fd65c 911 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 912 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
913 }
914
416fd65c 915 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 916 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 917 }
ff733df7 918
df593701
JB
919 private tasksMaxQueueSize (workerNodeKey: number): number {
920 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
921 }
922
416fd65c
JB
923 private flushTasksQueue (workerNodeKey: number): void {
924 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
925 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
926 this.executeTask(
927 workerNodeKey,
928 this.dequeueTask(workerNodeKey) as Task<Data>
929 )
ff733df7 930 }
ff733df7 931 }
df593701 932 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
933 }
934
ef41a6e6
JB
935 private flushTasksQueues (): void {
936 for (const [workerNodeKey] of this.workerNodes.entries()) {
937 this.flushTasksQueue(workerNodeKey)
938 }
939 }
b6b32453
JB
940
941 private setWorkerStatistics (worker: Worker): void {
942 this.sendToWorker(worker, {
943 statistics: {
87de9ff5
JB
944 runTime:
945 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 946 .runTime.aggregate,
87de9ff5 947 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 948 .elu.aggregate
b6b32453
JB
949 }
950 })
951 }
8604aaab 952
9c16fb4b 953 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
954 const getTasksQueueSize = (workerNodeKey?: number): number => {
955 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 956 }
df593701
JB
957 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
958 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
959 }
8604aaab 960 return {
9c16fb4b
JB
961 tasks: {
962 executed: 0,
963 executing: 0,
964 get queued (): number {
e3347a5c 965 return getTasksQueueSize(workerNodeKey)
9c16fb4b 966 },
df593701
JB
967 get maxQueued (): number {
968 return getTasksMaxQueueSize(workerNodeKey)
969 },
9c16fb4b
JB
970 failed: 0
971 },
8604aaab 972 runTime: {
932fc8be 973 aggregate: 0,
8604aaab
JB
974 average: 0,
975 median: 0,
976 history: new CircularArray()
977 },
978 waitTime: {
932fc8be 979 aggregate: 0,
8604aaab
JB
980 average: 0,
981 median: 0,
982 history: new CircularArray()
983 },
5df69fab
JB
984 elu: {
985 idle: {
986 aggregate: 0,
987 average: 0,
988 median: 0,
989 history: new CircularArray()
990 },
991 active: {
992 aggregate: 0,
993 average: 0,
994 median: 0,
995 history: new CircularArray()
996 },
997 utilization: 0
998 }
8604aaab
JB
999 }
1000 }
c97c7edb 1001}