Merge branch 'master' of github.com:jerome-benoit/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
a35560ba 34import {
f0d7f803 35 Measurements,
a35560ba 36 WorkerChoiceStrategies,
a20f0ba5
JB
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
bdaf31cd
JB
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 41import { version } from './version'
23ccf9d7 42
729c563d 43/**
ea7a90d3 44 * Base class that implements some shared logic for all poolifier pools.
729c563d 45 *
38e795c1 46 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 49 */
c97c7edb 50export abstract class AbstractPool<
f06e48d8 51 Worker extends IWorker,
d3c8a1a8
S
52 Data = unknown,
53 Response = unknown
c4855468 54> implements IPool<Worker, Data, Response> {
afc003b2 55 /** @inheritDoc */
f06e48d8 56 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 57
afc003b2 58 /** @inheritDoc */
7c0ba920
JB
59 public readonly emitter?: PoolEmitter
60
be0676b3 61 /**
a3445496 62 * The execution response promise map.
be0676b3 63 *
2740a743 64 * - `key`: The message id of each submitted task.
a3445496 65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 66 *
a3445496 67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 68 */
c923ce56
JB
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 73
a35560ba 74 /**
51fe3d3c 75 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
78 Worker,
79 Data,
80 Response
a35560ba
S
81 >
82
afe0d5bf
JB
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
729c563d
S
88 /**
89 * Constructs a new poolifier pool.
90 *
38e795c1 91 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 92 * @param filePath - Path to the worker file.
38e795c1 93 * @param opts - Options for the pool.
729c563d 94 */
c97c7edb 95 public constructor (
b4213b7f
JB
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
c97c7edb 99 ) {
78cea37e 100 if (!this.isMain()) {
c97c7edb
S
101 throw new Error('Cannot start a pool from a worker!')
102 }
8d3782fa 103 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 104 this.checkFilePath(this.filePath)
7c0ba920 105 this.checkPoolOptions(this.opts)
1086026a 106
7254e419
JB
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 111
6bd72cd0 112 if (this.opts.enableEvents === true) {
7c0ba920
JB
113 this.emitter = new PoolEmitter()
114 }
d59df138
JB
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
da309861
JB
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
b6b32453
JB
124
125 this.setupHook()
126
afe0d5bf 127 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
128 this.createAndSetupWorker()
129 }
afe0d5bf
JB
130
131 this.startTimestamp = performance.now()
c97c7edb
S
132 }
133
a35560ba 134 private checkFilePath (filePath: string): void {
ffcbbad8
JB
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
c510fea7
APA
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
8d3782fa
JB
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
78cea37e 148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 149 throw new TypeError(
0d80593b 150 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
151 )
152 } else if (numberOfWorkers < 0) {
473c717a 153 throw new RangeError(
8d3782fa
JB
154 'Cannot instantiate a pool with a negative number of workers'
155 )
6b27d407 156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
157 throw new Error('Cannot instantiate a fixed pool with no worker')
158 }
159 }
160
7c0ba920 161 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
162 if (isPlainObject(opts)) {
163 this.opts.workerChoiceStrategy =
164 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
165 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
166 this.opts.workerChoiceStrategyOptions =
167 opts.workerChoiceStrategyOptions ??
168 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
169 this.checkValidWorkerChoiceStrategyOptions(
170 this.opts.workerChoiceStrategyOptions
171 )
1f68cede 172 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
173 this.opts.enableEvents = opts.enableEvents ?? true
174 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
175 if (this.opts.enableTasksQueue) {
176 this.checkValidTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
180 opts.tasksQueueOptions as TasksQueueOptions
181 )
182 }
183 } else {
184 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 185 }
aee46736
JB
186 }
187
188 private checkValidWorkerChoiceStrategy (
189 workerChoiceStrategy: WorkerChoiceStrategy
190 ): void {
191 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 192 throw new Error(
aee46736 193 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
194 )
195 }
7c0ba920
JB
196 }
197
0d80593b
JB
198 private checkValidWorkerChoiceStrategyOptions (
199 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
200 ): void {
201 if (!isPlainObject(workerChoiceStrategyOptions)) {
202 throw new TypeError(
203 'Invalid worker choice strategy options: must be a plain object'
204 )
205 }
49be33fe
JB
206 if (
207 workerChoiceStrategyOptions.weights != null &&
6b27d407 208 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
209 ) {
210 throw new Error(
211 'Invalid worker choice strategy options: must have a weight for each worker node'
212 )
213 }
f0d7f803
JB
214 if (
215 workerChoiceStrategyOptions.measurement != null &&
216 !Object.values(Measurements).includes(
217 workerChoiceStrategyOptions.measurement
218 )
219 ) {
220 throw new Error(
221 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
222 )
223 }
0d80593b
JB
224 }
225
a20f0ba5
JB
226 private checkValidTasksQueueOptions (
227 tasksQueueOptions: TasksQueueOptions
228 ): void {
0d80593b
JB
229 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
230 throw new TypeError('Invalid tasks queue options: must be a plain object')
231 }
f0d7f803
JB
232 if (
233 tasksQueueOptions?.concurrency != null &&
234 !Number.isSafeInteger(tasksQueueOptions.concurrency)
235 ) {
236 throw new TypeError(
237 'Invalid worker tasks concurrency: must be an integer'
238 )
239 }
240 if (
241 tasksQueueOptions?.concurrency != null &&
242 tasksQueueOptions.concurrency <= 0
243 ) {
a20f0ba5 244 throw new Error(
f0d7f803 245 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
246 )
247 }
248 }
249
08f3f44c 250 /** @inheritDoc */
6b27d407
JB
251 public get info (): PoolInfo {
252 return {
23ccf9d7 253 version,
6b27d407 254 type: this.type,
184855e6 255 worker: this.worker,
6b27d407
JB
256 minSize: this.minSize,
257 maxSize: this.maxSize,
c05f0d50
JB
258 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .runTime.aggregate &&
1305e9a8
JB
260 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
261 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
262 workerNodes: this.workerNodes.length,
263 idleWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
f59e1027 265 workerNode.usage.tasks.executing === 0
a4e07f72
JB
266 ? accumulator + 1
267 : accumulator,
6b27d407
JB
268 0
269 ),
270 busyWorkerNodes: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
f59e1027 272 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
273 0
274 ),
a4e07f72 275 executedTasks: this.workerNodes.reduce(
6b27d407 276 (accumulator, workerNode) =>
f59e1027 277 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
278 0
279 ),
280 executingTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
f59e1027 282 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
283 0
284 ),
285 queuedTasks: this.workerNodes.reduce(
df593701 286 (accumulator, workerNode) =>
f59e1027 287 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
288 0
289 ),
290 maxQueuedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
f59e1027 292 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 293 0
a4e07f72
JB
294 ),
295 failedTasks: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
f59e1027 297 accumulator + workerNode.usage.tasks.failed,
a4e07f72 298 0
6b27d407
JB
299 )
300 }
301 }
08f3f44c 302
afe0d5bf
JB
303 /**
304 * Gets the approximate pool utilization.
305 *
306 * @returns The pool utilization.
307 */
308 private get utilization (): number {
fe7d90db
JB
309 const poolRunTimeCapacity =
310 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
311 const totalTasksRunTime = this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.runTime.aggregate,
314 0
315 )
316 const totalTasksWaitTime = this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.waitTime.aggregate,
319 0
320 )
321 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
322 }
323
8881ae32
JB
324 /**
325 * Pool type.
326 *
327 * If it is `'dynamic'`, it provides the `max` property.
328 */
329 protected abstract get type (): PoolType
330
184855e6
JB
331 /**
332 * Gets the worker type.
333 */
334 protected abstract get worker (): WorkerType
335
c2ade475 336 /**
6b27d407 337 * Pool minimum size.
c2ade475 338 */
6b27d407 339 protected abstract get minSize (): number
ff733df7
JB
340
341 /**
6b27d407 342 * Pool maximum size.
ff733df7 343 */
6b27d407 344 protected abstract get maxSize (): number
a35560ba 345
f59e1027
JB
346 /**
347 * Get the worker given its id.
348 *
349 * @param workerId - The worker id.
350 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
351 */
352 private getWorkerById (workerId: number): Worker | undefined {
353 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
354 ?.worker
355 }
356
ffcbbad8 357 /**
f06e48d8 358 * Gets the given worker its worker node key.
ffcbbad8
JB
359 *
360 * @param worker - The worker.
f59e1027 361 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 362 */
f06e48d8
JB
363 private getWorkerNodeKey (worker: Worker): number {
364 return this.workerNodes.findIndex(
365 workerNode => workerNode.worker === worker
366 )
bf9549ae
JB
367 }
368
afc003b2 369 /** @inheritDoc */
a35560ba 370 public setWorkerChoiceStrategy (
59219cbb
JB
371 workerChoiceStrategy: WorkerChoiceStrategy,
372 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 373 ): void {
aee46736 374 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 375 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
376 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
377 this.opts.workerChoiceStrategy
378 )
379 if (workerChoiceStrategyOptions != null) {
380 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
381 }
9c16fb4b 382 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
383 this.setWorkerNodeTasksUsage(
384 workerNode,
9c16fb4b 385 this.getWorkerUsage(workerNodeKey)
8604aaab 386 )
b6b32453 387 this.setWorkerStatistics(workerNode.worker)
59219cbb 388 }
a20f0ba5
JB
389 }
390
391 /** @inheritDoc */
392 public setWorkerChoiceStrategyOptions (
393 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
394 ): void {
0d80593b 395 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
396 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
397 this.workerChoiceStrategyContext.setOptions(
398 this.opts.workerChoiceStrategyOptions
a35560ba
S
399 )
400 }
401
a20f0ba5 402 /** @inheritDoc */
8f52842f
JB
403 public enableTasksQueue (
404 enable: boolean,
405 tasksQueueOptions?: TasksQueueOptions
406 ): void {
a20f0ba5 407 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 408 this.flushTasksQueues()
a20f0ba5
JB
409 }
410 this.opts.enableTasksQueue = enable
8f52842f 411 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
412 }
413
414 /** @inheritDoc */
8f52842f 415 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 416 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
417 this.checkValidTasksQueueOptions(tasksQueueOptions)
418 this.opts.tasksQueueOptions =
419 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 420 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
421 delete this.opts.tasksQueueOptions
422 }
423 }
424
425 private buildTasksQueueOptions (
426 tasksQueueOptions: TasksQueueOptions
427 ): TasksQueueOptions {
428 return {
429 concurrency: tasksQueueOptions?.concurrency ?? 1
430 }
431 }
432
c319c66b
JB
433 /**
434 * Whether the pool is full or not.
435 *
436 * The pool filling boolean status.
437 */
dea903a8
JB
438 protected get full (): boolean {
439 return this.workerNodes.length >= this.maxSize
440 }
c2ade475 441
c319c66b
JB
442 /**
443 * Whether the pool is busy or not.
444 *
445 * The pool busyness boolean status.
446 */
447 protected abstract get busy (): boolean
7c0ba920 448
6c6afb84
JB
449 /**
450 * Whether worker nodes are executing at least one task.
451 *
452 * @returns Worker nodes busyness boolean status.
453 */
c2ade475 454 protected internalBusy (): boolean {
e0ae6100
JB
455 return (
456 this.workerNodes.findIndex(workerNode => {
f59e1027 457 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
458 }) === -1
459 )
cb70b19d
JB
460 }
461
afc003b2 462 /** @inheritDoc */
a86b6df1 463 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 464 const timestamp = performance.now()
20dcad1a 465 const workerNodeKey = this.chooseWorkerNode()
adc3c320 466 const submittedTask: Task<Data> = {
a86b6df1 467 name,
e5a5c0fc
JB
468 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
469 data: data ?? ({} as Data),
b6b32453 470 timestamp,
adc3c320
JB
471 id: crypto.randomUUID()
472 }
2e81254d 473 const res = new Promise<Response>((resolve, reject) => {
02706357 474 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
475 resolve,
476 reject,
20dcad1a 477 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
478 })
479 })
ff733df7
JB
480 if (
481 this.opts.enableTasksQueue === true &&
7171d33f 482 (this.busy ||
f59e1027 483 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 484 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 485 .concurrency as number))
ff733df7 486 ) {
26a929d7
JB
487 this.enqueueTask(workerNodeKey, submittedTask)
488 } else {
2e81254d 489 this.executeTask(workerNodeKey, submittedTask)
adc3c320 490 }
ff733df7 491 this.checkAndEmitEvents()
78cea37e 492 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
493 return res
494 }
c97c7edb 495
afc003b2 496 /** @inheritDoc */
c97c7edb 497 public async destroy (): Promise<void> {
1fbcaa7c 498 await Promise.all(
875a7c37
JB
499 this.workerNodes.map(async (workerNode, workerNodeKey) => {
500 this.flushTasksQueue(workerNodeKey)
47aacbaa 501 // FIXME: wait for tasks to be finished
920278a2
JB
502 const workerExitPromise = new Promise<void>(resolve => {
503 workerNode.worker.on('exit', () => {
504 resolve()
505 })
506 })
f06e48d8 507 await this.destroyWorker(workerNode.worker)
920278a2 508 await workerExitPromise
1fbcaa7c
JB
509 })
510 )
c97c7edb
S
511 }
512
4a6952ff 513 /**
6c6afb84 514 * Terminates the given worker.
4a6952ff 515 *
f06e48d8 516 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
517 */
518 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 519
729c563d 520 /**
6677a3d3
JB
521 * Setup hook to execute code before worker nodes are created in the abstract constructor.
522 * Can be overridden.
afc003b2
JB
523 *
524 * @virtual
729c563d 525 */
280c2a77 526 protected setupHook (): void {
d99ba5a8 527 // Intentionally empty
280c2a77 528 }
c97c7edb 529
729c563d 530 /**
280c2a77
S
531 * Should return whether the worker is the main worker or not.
532 */
533 protected abstract isMain (): boolean
534
535 /**
2e81254d 536 * Hook executed before the worker task execution.
bf9549ae 537 * Can be overridden.
729c563d 538 *
f06e48d8 539 * @param workerNodeKey - The worker node key.
1c6fe997 540 * @param task - The task to execute.
729c563d 541 */
1c6fe997
JB
542 protected beforeTaskExecutionHook (
543 workerNodeKey: number,
544 task: Task<Data>
545 ): void {
f59e1027 546 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
547 ++workerUsage.tasks.executing
548 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
549 }
550
c01733f1 551 /**
2e81254d 552 * Hook executed after the worker task execution.
bf9549ae 553 * Can be overridden.
c01733f1 554 *
c923ce56 555 * @param worker - The worker.
38e795c1 556 * @param message - The received message.
c01733f1 557 */
2e81254d 558 protected afterTaskExecutionHook (
c923ce56 559 worker: Worker,
2740a743 560 message: MessageValue<Response>
bf9549ae 561 ): void {
f59e1027 562 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
563 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
564 this.updateRunTimeWorkerUsage(workerUsage, message)
565 this.updateEluWorkerUsage(workerUsage, message)
566 }
567
568 private updateTaskStatisticsWorkerUsage (
569 workerUsage: WorkerUsage,
570 message: MessageValue<Response>
571 ): void {
a4e07f72
JB
572 const workerTaskStatistics = workerUsage.tasks
573 --workerTaskStatistics.executing
574 ++workerTaskStatistics.executed
82f36766 575 if (message.taskError != null) {
a4e07f72 576 ++workerTaskStatistics.failed
2740a743 577 }
f8eb0a2a
JB
578 }
579
a4e07f72
JB
580 private updateRunTimeWorkerUsage (
581 workerUsage: WorkerUsage,
f8eb0a2a
JB
582 message: MessageValue<Response>
583 ): void {
87de9ff5
JB
584 if (
585 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 586 .aggregate
87de9ff5 587 ) {
932fc8be 588 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 589 if (
932fc8be
JB
590 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
591 .average &&
a4e07f72 592 workerUsage.tasks.executed !== 0
c6bd2650 593 ) {
a4e07f72 594 workerUsage.runTime.average =
f1c06930
JB
595 workerUsage.runTime.aggregate /
596 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 597 }
3fa4cdd2 598 if (
932fc8be
JB
599 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
600 .median &&
d715b7bc 601 message.taskPerformance?.runTime != null
3fa4cdd2 602 ) {
a4e07f72
JB
603 workerUsage.runTime.history.push(message.taskPerformance.runTime)
604 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 605 }
3032893a 606 }
f8eb0a2a
JB
607 }
608
a4e07f72
JB
609 private updateWaitTimeWorkerUsage (
610 workerUsage: WorkerUsage,
1c6fe997 611 task: Task<Data>
f8eb0a2a 612 ): void {
1c6fe997
JB
613 const timestamp = performance.now()
614 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 617 .aggregate
87de9ff5 618 ) {
932fc8be 619 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 620 if (
87de9ff5 621 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 622 .waitTime.average &&
a4e07f72 623 workerUsage.tasks.executed !== 0
09a6305f 624 ) {
a4e07f72 625 workerUsage.waitTime.average =
f1c06930
JB
626 workerUsage.waitTime.aggregate /
627 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
628 }
629 if (
87de9ff5 630 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 631 .waitTime.median &&
1c6fe997 632 taskWaitTime != null
09a6305f 633 ) {
1c6fe997 634 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 635 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 636 }
0567595a 637 }
c01733f1 638 }
639
a4e07f72 640 private updateEluWorkerUsage (
5df69fab 641 workerUsage: WorkerUsage,
62c15a68
JB
642 message: MessageValue<Response>
643 ): void {
5df69fab
JB
644 if (
645 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
646 .aggregate
647 ) {
648 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
649 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
650 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
651 workerUsage.elu.utilization =
652 (workerUsage.elu.utilization +
653 message.taskPerformance.elu.utilization) /
654 2
655 } else if (message.taskPerformance?.elu != null) {
656 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
657 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
658 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
659 }
d715b7bc 660 if (
5df69fab
JB
661 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
662 .average &&
663 workerUsage.tasks.executed !== 0
664 ) {
f1c06930
JB
665 const executedTasks =
666 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 667 workerUsage.elu.idle.average =
f1c06930 668 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 669 workerUsage.elu.active.average =
f1c06930 670 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
671 }
672 if (
673 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
674 .median &&
d715b7bc
JB
675 message.taskPerformance?.elu != null
676 ) {
5df69fab
JB
677 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
678 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
679 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
680 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
681 }
682 }
683 }
684
280c2a77 685 /**
f06e48d8 686 * Chooses a worker node for the next task.
280c2a77 687 *
6c6afb84 688 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 689 *
20dcad1a 690 * @returns The worker node key
280c2a77 691 */
6c6afb84 692 private chooseWorkerNode (): number {
930dcf12 693 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
694 const worker = this.createAndSetupDynamicWorker()
695 if (
696 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
697 ) {
698 return this.getWorkerNodeKey(worker)
699 }
17393ac8 700 }
930dcf12
JB
701 return this.workerChoiceStrategyContext.execute()
702 }
703
6c6afb84
JB
704 /**
705 * Conditions for dynamic worker creation.
706 *
707 * @returns Whether to create a dynamic worker or not.
708 */
709 private shallCreateDynamicWorker (): boolean {
930dcf12 710 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
711 }
712
280c2a77 713 /**
675bb809 714 * Sends a message to the given worker.
280c2a77 715 *
38e795c1
JB
716 * @param worker - The worker which should receive the message.
717 * @param message - The message.
280c2a77
S
718 */
719 protected abstract sendToWorker (
720 worker: Worker,
721 message: MessageValue<Data>
722 ): void
723
4a6952ff 724 /**
f06e48d8 725 * Registers a listener callback on the given worker.
4a6952ff 726 *
38e795c1
JB
727 * @param worker - The worker which should register a listener.
728 * @param listener - The message listener callback.
4a6952ff 729 */
e102732c
JB
730 private registerWorkerMessageListener<Message extends Data | Response>(
731 worker: Worker,
732 listener: (message: MessageValue<Message>) => void
733 ): void {
734 worker.on('message', listener as MessageHandler<Worker>)
735 }
c97c7edb 736
729c563d 737 /**
41344292 738 * Creates a new worker.
6c6afb84
JB
739 *
740 * @returns Newly created worker.
729c563d 741 */
280c2a77 742 protected abstract createWorker (): Worker
c97c7edb 743
729c563d 744 /**
f06e48d8 745 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 746 * Can be overridden.
729c563d 747 *
38e795c1 748 * @param worker - The newly created worker.
729c563d 749 */
6677a3d3 750 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
751 // Listen to worker messages.
752 this.registerWorkerMessageListener(worker, this.workerListener())
753 }
c97c7edb 754
4a6952ff 755 /**
f06e48d8 756 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
757 *
758 * @returns New, completely set up worker.
759 */
760 protected createAndSetupWorker (): Worker {
bdacc2d2 761 const worker = this.createWorker()
280c2a77 762
35cf1c03 763 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 764 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
765 worker.on('error', error => {
766 if (this.emitter != null) {
767 this.emitter.emit(PoolEvents.error, error)
768 }
5bc91f3e
JB
769 if (this.opts.enableTasksQueue === true) {
770 const workerNodeKey = this.getWorkerNodeKey(worker)
771 while (this.tasksQueueSize(workerNodeKey) > 0) {
772 let targetWorkerNodeKey: number = workerNodeKey
773 let minQueuedTasks = Infinity
774 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
775 if (
776 workerNodeId !== workerNodeKey &&
777 workerNode.usage.tasks.queued === 0
778 ) {
779 targetWorkerNodeKey = workerNodeId
780 break
781 }
782 if (
783 workerNodeId !== workerNodeKey &&
784 workerNode.usage.tasks.queued < minQueuedTasks
785 ) {
786 minQueuedTasks = workerNode.usage.tasks.queued
787 targetWorkerNodeKey = workerNodeId
788 }
789 }
790 this.enqueueTask(
791 targetWorkerNodeKey,
792 this.dequeueTask(workerNodeKey) as Task<Data>
793 )
794 }
795 }
e8a4c3ea 796 if (this.opts.restartWorkerOnError === true) {
1f68cede 797 this.createAndSetupWorker()
5baee0d7
JB
798 }
799 })
a35560ba
S
800 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
801 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 802 worker.once('exit', () => {
f06e48d8 803 this.removeWorkerNode(worker)
a974afa6 804 })
280c2a77 805
f06e48d8 806 this.pushWorkerNode(worker)
280c2a77 807
b6b32453
JB
808 this.setWorkerStatistics(worker)
809
280c2a77
S
810 this.afterWorkerSetup(worker)
811
c97c7edb
S
812 return worker
813 }
be0676b3 814
930dcf12
JB
815 /**
816 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
817 *
818 * @returns New, completely set up dynamic worker.
819 */
820 protected createAndSetupDynamicWorker (): Worker {
821 const worker = this.createAndSetupWorker()
822 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 823 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
824 if (
825 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
826 (message.kill != null &&
827 ((this.opts.enableTasksQueue === false &&
f59e1027 828 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 829 (this.opts.enableTasksQueue === true &&
f59e1027 830 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 831 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
832 ) {
833 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
834 void (this.destroyWorker(worker) as Promise<void>)
835 }
836 })
837 return worker
838 }
839
be0676b3 840 /**
ff733df7 841 * This function is the listener registered for each worker message.
be0676b3 842 *
bdacc2d2 843 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
844 */
845 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 846 return message => {
f59e1027
JB
847 if (message.workerId != null && message.started != null) {
848 // Worker started message received
6b272951 849 this.handleWorkerStartedMessage(message)
f59e1027 850 } else if (message.id != null) {
a3445496 851 // Task execution response received
6b272951
JB
852 this.handleTaskExecutionResponse(message)
853 }
854 }
855 }
856
857 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
858 // Worker started message received
859 const worker = this.getWorkerById(message.workerId as number)
860 if (worker != null) {
861 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
862 message.started as boolean
863 } else {
864 throw new Error(
865 `Worker started message received from unknown worker '${
866 message.workerId as number
867 }'`
868 )
869 }
870 }
871
872 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
873 const promiseResponse = this.promiseResponseMap.get(message.id as string)
874 if (promiseResponse != null) {
875 if (message.taskError != null) {
876 if (this.emitter != null) {
877 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 878 }
6b272951
JB
879 promiseResponse.reject(message.taskError.message)
880 } else {
881 promiseResponse.resolve(message.data as Response)
882 }
883 this.afterTaskExecutionHook(promiseResponse.worker, message)
884 this.promiseResponseMap.delete(message.id as string)
885 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
886 if (
887 this.opts.enableTasksQueue === true &&
888 this.tasksQueueSize(workerNodeKey) > 0
889 ) {
890 this.executeTask(
891 workerNodeKey,
892 this.dequeueTask(workerNodeKey) as Task<Data>
893 )
be0676b3 894 }
6b272951 895 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 896 }
be0676b3 897 }
7c0ba920 898
ff733df7 899 private checkAndEmitEvents (): void {
1f68cede 900 if (this.emitter != null) {
ff733df7 901 if (this.busy) {
6b27d407 902 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 903 }
6b27d407
JB
904 if (this.type === PoolTypes.dynamic && this.full) {
905 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 906 }
164d950a
JB
907 }
908 }
909
0ebe2a9f
JB
910 /**
911 * Sets the given worker node its tasks usage in the pool.
912 *
913 * @param workerNode - The worker node.
a4e07f72 914 * @param workerUsage - The worker usage.
0ebe2a9f
JB
915 */
916 private setWorkerNodeTasksUsage (
917 workerNode: WorkerNode<Worker, Data>,
a4e07f72 918 workerUsage: WorkerUsage
0ebe2a9f 919 ): void {
f59e1027 920 workerNode.usage = workerUsage
0ebe2a9f
JB
921 }
922
a05c10de 923 /**
f06e48d8 924 * Pushes the given worker in the pool worker nodes.
ea7a90d3 925 *
38e795c1 926 * @param worker - The worker.
f06e48d8 927 * @returns The worker nodes length.
ea7a90d3 928 */
f06e48d8 929 private pushWorkerNode (worker: Worker): number {
9c16fb4b 930 this.workerNodes.push({
ffcbbad8 931 worker,
7ae6fb74 932 info: { id: this.getWorkerId(worker), started: true },
f59e1027 933 usage: this.getWorkerUsage(),
29ee7e9a 934 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 935 })
9c16fb4b
JB
936 const workerNodeKey = this.getWorkerNodeKey(worker)
937 this.setWorkerNodeTasksUsage(
938 this.workerNodes[workerNodeKey],
939 this.getWorkerUsage(workerNodeKey)
940 )
941 return this.workerNodes.length
ea7a90d3 942 }
c923ce56 943
83fa0a36
JB
944 /**
945 * Gets the worker id.
946 *
947 * @param worker - The worker.
948 * @returns The worker id.
949 */
950 private getWorkerId (worker: Worker): number | undefined {
951 if (this.worker === WorkerTypes.thread) {
952 return worker.threadId
953 } else if (this.worker === WorkerTypes.cluster) {
954 return worker.id
955 }
956 }
957
8604aaab
JB
958 // /**
959 // * Sets the given worker in the pool worker nodes.
960 // *
961 // * @param workerNodeKey - The worker node key.
962 // * @param worker - The worker.
f59e1027 963 // * @param workerInfo - The worker info.
8604aaab
JB
964 // * @param workerUsage - The worker usage.
965 // * @param tasksQueue - The worker task queue.
966 // */
967 // private setWorkerNode (
968 // workerNodeKey: number,
969 // worker: Worker,
f59e1027 970 // workerInfo: WorkerInfo,
8604aaab
JB
971 // workerUsage: WorkerUsage,
972 // tasksQueue: Queue<Task<Data>>
973 // ): void {
974 // this.workerNodes[workerNodeKey] = {
975 // worker,
f59e1027
JB
976 // info: workerInfo,
977 // usage: workerUsage,
8604aaab
JB
978 // tasksQueue
979 // }
980 // }
51fe3d3c
JB
981
982 /**
f06e48d8 983 * Removes the given worker from the pool worker nodes.
51fe3d3c 984 *
f06e48d8 985 * @param worker - The worker.
51fe3d3c 986 */
416fd65c 987 private removeWorkerNode (worker: Worker): void {
f06e48d8 988 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
989 if (workerNodeKey !== -1) {
990 this.workerNodes.splice(workerNodeKey, 1)
991 this.workerChoiceStrategyContext.remove(workerNodeKey)
992 }
51fe3d3c 993 }
adc3c320 994
2e81254d 995 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 996 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
997 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
998 }
999
f9f00b5f 1000 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1001 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1002 }
1003
416fd65c 1004 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1005 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1006 }
1007
416fd65c 1008 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1009 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1010 }
ff733df7 1011
df593701
JB
1012 private tasksMaxQueueSize (workerNodeKey: number): number {
1013 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1014 }
1015
416fd65c 1016 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1017 while (this.tasksQueueSize(workerNodeKey) > 0) {
1018 this.executeTask(
1019 workerNodeKey,
1020 this.dequeueTask(workerNodeKey) as Task<Data>
1021 )
ff733df7 1022 }
df593701 1023 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1024 }
1025
ef41a6e6
JB
1026 private flushTasksQueues (): void {
1027 for (const [workerNodeKey] of this.workerNodes.entries()) {
1028 this.flushTasksQueue(workerNodeKey)
1029 }
1030 }
b6b32453
JB
1031
1032 private setWorkerStatistics (worker: Worker): void {
1033 this.sendToWorker(worker, {
1034 statistics: {
87de9ff5
JB
1035 runTime:
1036 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1037 .runTime.aggregate,
87de9ff5 1038 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1039 .elu.aggregate
b6b32453
JB
1040 }
1041 })
1042 }
8604aaab 1043
9c16fb4b 1044 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1045 const getTasksQueueSize = (workerNodeKey?: number): number => {
1046 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1047 }
df593701
JB
1048 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1049 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1050 }
8604aaab 1051 return {
9c16fb4b
JB
1052 tasks: {
1053 executed: 0,
1054 executing: 0,
1055 get queued (): number {
e3347a5c 1056 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1057 },
df593701
JB
1058 get maxQueued (): number {
1059 return getTasksMaxQueueSize(workerNodeKey)
1060 },
9c16fb4b
JB
1061 failed: 0
1062 },
8604aaab 1063 runTime: {
932fc8be 1064 aggregate: 0,
8604aaab
JB
1065 average: 0,
1066 median: 0,
1067 history: new CircularArray()
1068 },
1069 waitTime: {
932fc8be 1070 aggregate: 0,
8604aaab
JB
1071 average: 0,
1072 median: 0,
1073 history: new CircularArray()
1074 },
5df69fab
JB
1075 elu: {
1076 idle: {
1077 aggregate: 0,
1078 average: 0,
1079 median: 0,
1080 history: new CircularArray()
1081 },
1082 active: {
1083 aggregate: 0,
1084 average: 0,
1085 median: 0,
1086 history: new CircularArray()
1087 },
1088 utilization: 0
1089 }
8604aaab
JB
1090 }
1091 }
c97c7edb 1092}