refactor: encapsulate worker node handling logic into its own class
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
4b628b48 21 type TasksQueueOptions
c4855468 22} from './pool'
e102732c
JB
23import type {
24 IWorker,
4b628b48 25 IWorkerNode,
e102732c
JB
26 MessageHandler,
27 Task,
8a1260a3 28 WorkerInfo,
4b628b48 29 WorkerType,
e102732c
JB
30 WorkerUsage
31} from './worker'
a35560ba 32import {
f0d7f803 33 Measurements,
a35560ba 34 WorkerChoiceStrategies,
a20f0ba5
JB
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
bdaf31cd
JB
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 39import { version } from './version'
4b628b48 40import { WorkerNode } from './worker-node'
23ccf9d7 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
4b628b48 55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
156 throw new Error('Cannot instantiate a fixed pool with no worker')
157 }
158 }
159
7c0ba920 160 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
161 if (isPlainObject(opts)) {
162 this.opts.workerChoiceStrategy =
163 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
164 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
165 this.opts.workerChoiceStrategyOptions =
166 opts.workerChoiceStrategyOptions ??
167 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
168 this.checkValidWorkerChoiceStrategyOptions(
169 this.opts.workerChoiceStrategyOptions
170 )
1f68cede 171 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
172 this.opts.enableEvents = opts.enableEvents ?? true
173 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
174 if (this.opts.enableTasksQueue) {
175 this.checkValidTasksQueueOptions(
176 opts.tasksQueueOptions as TasksQueueOptions
177 )
178 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
179 opts.tasksQueueOptions as TasksQueueOptions
180 )
181 }
182 } else {
183 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 184 }
aee46736
JB
185 }
186
187 private checkValidWorkerChoiceStrategy (
188 workerChoiceStrategy: WorkerChoiceStrategy
189 ): void {
190 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 191 throw new Error(
aee46736 192 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
193 )
194 }
7c0ba920
JB
195 }
196
0d80593b
JB
197 private checkValidWorkerChoiceStrategyOptions (
198 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
199 ): void {
200 if (!isPlainObject(workerChoiceStrategyOptions)) {
201 throw new TypeError(
202 'Invalid worker choice strategy options: must be a plain object'
203 )
204 }
49be33fe
JB
205 if (
206 workerChoiceStrategyOptions.weights != null &&
6b27d407 207 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
208 ) {
209 throw new Error(
210 'Invalid worker choice strategy options: must have a weight for each worker node'
211 )
212 }
f0d7f803
JB
213 if (
214 workerChoiceStrategyOptions.measurement != null &&
215 !Object.values(Measurements).includes(
216 workerChoiceStrategyOptions.measurement
217 )
218 ) {
219 throw new Error(
220 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
221 )
222 }
0d80593b
JB
223 }
224
a20f0ba5
JB
225 private checkValidTasksQueueOptions (
226 tasksQueueOptions: TasksQueueOptions
227 ): void {
0d80593b
JB
228 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
229 throw new TypeError('Invalid tasks queue options: must be a plain object')
230 }
f0d7f803
JB
231 if (
232 tasksQueueOptions?.concurrency != null &&
233 !Number.isSafeInteger(tasksQueueOptions.concurrency)
234 ) {
235 throw new TypeError(
236 'Invalid worker tasks concurrency: must be an integer'
237 )
238 }
239 if (
240 tasksQueueOptions?.concurrency != null &&
241 tasksQueueOptions.concurrency <= 0
242 ) {
a20f0ba5 243 throw new Error(
f0d7f803 244 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
245 )
246 }
247 }
248
08f3f44c 249 /** @inheritDoc */
6b27d407
JB
250 public get info (): PoolInfo {
251 return {
23ccf9d7 252 version,
6b27d407 253 type: this.type,
184855e6 254 worker: this.worker,
6b27d407
JB
255 minSize: this.minSize,
256 maxSize: this.maxSize,
c05f0d50
JB
257 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
258 .runTime.aggregate &&
1305e9a8
JB
259 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
261 workerNodes: this.workerNodes.length,
262 idleWorkerNodes: this.workerNodes.reduce(
263 (accumulator, workerNode) =>
f59e1027 264 workerNode.usage.tasks.executing === 0
a4e07f72
JB
265 ? accumulator + 1
266 : accumulator,
6b27d407
JB
267 0
268 ),
269 busyWorkerNodes: this.workerNodes.reduce(
270 (accumulator, workerNode) =>
f59e1027 271 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
272 0
273 ),
a4e07f72 274 executedTasks: this.workerNodes.reduce(
6b27d407 275 (accumulator, workerNode) =>
f59e1027 276 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
277 0
278 ),
279 executingTasks: this.workerNodes.reduce(
280 (accumulator, workerNode) =>
f59e1027 281 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
282 0
283 ),
284 queuedTasks: this.workerNodes.reduce(
df593701 285 (accumulator, workerNode) =>
f59e1027 286 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
287 0
288 ),
289 maxQueuedTasks: this.workerNodes.reduce(
290 (accumulator, workerNode) =>
f59e1027 291 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 292 0
a4e07f72
JB
293 ),
294 failedTasks: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
f59e1027 296 accumulator + workerNode.usage.tasks.failed,
a4e07f72 297 0
1dcf8b7b
JB
298 ),
299 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
300 .runTime.aggregate && {
301 runTime: {
98e72cda
JB
302 minimum: round(
303 Math.min(
304 ...this.workerNodes.map(
305 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
306 )
1dcf8b7b
JB
307 )
308 ),
98e72cda
JB
309 maximum: round(
310 Math.max(
311 ...this.workerNodes.map(
312 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
313 )
1dcf8b7b 314 )
98e72cda
JB
315 ),
316 average: round(
317 this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
320 0
321 ) /
322 this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 accumulator + (workerNode.usage.tasks?.executed ?? 0),
325 0
326 )
327 ),
328 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
329 .runTime.median && {
330 median: round(
331 median(
332 this.workerNodes.map(
333 workerNode => workerNode.usage.runTime?.median ?? 0
334 )
335 )
336 )
337 })
1dcf8b7b
JB
338 }
339 }),
340 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
341 .waitTime.aggregate && {
342 waitTime: {
98e72cda
JB
343 minimum: round(
344 Math.min(
345 ...this.workerNodes.map(
346 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
347 )
1dcf8b7b
JB
348 )
349 ),
98e72cda
JB
350 maximum: round(
351 Math.max(
352 ...this.workerNodes.map(
353 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
354 )
1dcf8b7b 355 )
98e72cda
JB
356 ),
357 average: round(
358 this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
361 0
362 ) /
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.tasks?.executed ?? 0),
366 0
367 )
368 ),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .waitTime.median && {
371 median: round(
372 median(
373 this.workerNodes.map(
374 workerNode => workerNode.usage.waitTime?.median ?? 0
375 )
376 )
377 )
378 })
1dcf8b7b
JB
379 }
380 })
6b27d407
JB
381 }
382 }
08f3f44c 383
afe0d5bf
JB
384 /**
385 * Gets the approximate pool utilization.
386 *
387 * @returns The pool utilization.
388 */
389 private get utilization (): number {
fe7d90db
JB
390 const poolRunTimeCapacity =
391 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
392 const totalTasksRunTime = this.workerNodes.reduce(
393 (accumulator, workerNode) =>
71514351 394 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
395 0
396 )
397 const totalTasksWaitTime = this.workerNodes.reduce(
398 (accumulator, workerNode) =>
71514351 399 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
400 0
401 )
402 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
403 }
404
8881ae32
JB
405 /**
406 * Pool type.
407 *
408 * If it is `'dynamic'`, it provides the `max` property.
409 */
410 protected abstract get type (): PoolType
411
184855e6
JB
412 /**
413 * Gets the worker type.
414 */
415 protected abstract get worker (): WorkerType
416
c2ade475 417 /**
6b27d407 418 * Pool minimum size.
c2ade475 419 */
6b27d407 420 protected abstract get minSize (): number
ff733df7
JB
421
422 /**
6b27d407 423 * Pool maximum size.
ff733df7 424 */
6b27d407 425 protected abstract get maxSize (): number
a35560ba 426
f59e1027
JB
427 /**
428 * Get the worker given its id.
429 *
430 * @param workerId - The worker id.
431 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
432 */
433 private getWorkerById (workerId: number): Worker | undefined {
434 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
435 ?.worker
436 }
437
ffcbbad8 438 /**
f06e48d8 439 * Gets the given worker its worker node key.
ffcbbad8
JB
440 *
441 * @param worker - The worker.
f59e1027 442 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 443 */
f06e48d8
JB
444 private getWorkerNodeKey (worker: Worker): number {
445 return this.workerNodes.findIndex(
446 workerNode => workerNode.worker === worker
447 )
bf9549ae
JB
448 }
449
afc003b2 450 /** @inheritDoc */
a35560ba 451 public setWorkerChoiceStrategy (
59219cbb
JB
452 workerChoiceStrategy: WorkerChoiceStrategy,
453 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 454 ): void {
aee46736 455 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 456 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
457 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
458 this.opts.workerChoiceStrategy
459 )
460 if (workerChoiceStrategyOptions != null) {
461 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
462 }
8a1260a3 463 for (const workerNode of this.workerNodes) {
4b628b48 464 workerNode.resetUsage()
b6b32453 465 this.setWorkerStatistics(workerNode.worker)
59219cbb 466 }
a20f0ba5
JB
467 }
468
469 /** @inheritDoc */
470 public setWorkerChoiceStrategyOptions (
471 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
472 ): void {
0d80593b 473 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
474 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
475 this.workerChoiceStrategyContext.setOptions(
476 this.opts.workerChoiceStrategyOptions
a35560ba
S
477 )
478 }
479
a20f0ba5 480 /** @inheritDoc */
8f52842f
JB
481 public enableTasksQueue (
482 enable: boolean,
483 tasksQueueOptions?: TasksQueueOptions
484 ): void {
a20f0ba5 485 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 486 this.flushTasksQueues()
a20f0ba5
JB
487 }
488 this.opts.enableTasksQueue = enable
8f52842f 489 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
490 }
491
492 /** @inheritDoc */
8f52842f 493 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 494 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
495 this.checkValidTasksQueueOptions(tasksQueueOptions)
496 this.opts.tasksQueueOptions =
497 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 498 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
499 delete this.opts.tasksQueueOptions
500 }
501 }
502
503 private buildTasksQueueOptions (
504 tasksQueueOptions: TasksQueueOptions
505 ): TasksQueueOptions {
506 return {
507 concurrency: tasksQueueOptions?.concurrency ?? 1
508 }
509 }
510
c319c66b
JB
511 /**
512 * Whether the pool is full or not.
513 *
514 * The pool filling boolean status.
515 */
dea903a8
JB
516 protected get full (): boolean {
517 return this.workerNodes.length >= this.maxSize
518 }
c2ade475 519
c319c66b
JB
520 /**
521 * Whether the pool is busy or not.
522 *
523 * The pool busyness boolean status.
524 */
525 protected abstract get busy (): boolean
7c0ba920 526
6c6afb84
JB
527 /**
528 * Whether worker nodes are executing at least one task.
529 *
530 * @returns Worker nodes busyness boolean status.
531 */
c2ade475 532 protected internalBusy (): boolean {
e0ae6100
JB
533 return (
534 this.workerNodes.findIndex(workerNode => {
f59e1027 535 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
536 }) === -1
537 )
cb70b19d
JB
538 }
539
afc003b2 540 /** @inheritDoc */
a86b6df1 541 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 542 const timestamp = performance.now()
20dcad1a 543 const workerNodeKey = this.chooseWorkerNode()
adc3c320 544 const submittedTask: Task<Data> = {
a86b6df1 545 name,
e5a5c0fc
JB
546 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
547 data: data ?? ({} as Data),
b6b32453 548 timestamp,
2845f2a5 549 id: randomUUID()
adc3c320 550 }
2e81254d 551 const res = new Promise<Response>((resolve, reject) => {
02706357 552 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
553 resolve,
554 reject,
20dcad1a 555 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
556 })
557 })
ff733df7
JB
558 if (
559 this.opts.enableTasksQueue === true &&
7171d33f 560 (this.busy ||
f59e1027 561 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 562 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 563 .concurrency as number))
ff733df7 564 ) {
26a929d7
JB
565 this.enqueueTask(workerNodeKey, submittedTask)
566 } else {
2e81254d 567 this.executeTask(workerNodeKey, submittedTask)
adc3c320 568 }
ff733df7 569 this.checkAndEmitEvents()
78cea37e 570 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
571 return res
572 }
c97c7edb 573
afc003b2 574 /** @inheritDoc */
c97c7edb 575 public async destroy (): Promise<void> {
1fbcaa7c 576 await Promise.all(
875a7c37
JB
577 this.workerNodes.map(async (workerNode, workerNodeKey) => {
578 this.flushTasksQueue(workerNodeKey)
47aacbaa 579 // FIXME: wait for tasks to be finished
920278a2
JB
580 const workerExitPromise = new Promise<void>(resolve => {
581 workerNode.worker.on('exit', () => {
582 resolve()
583 })
584 })
f06e48d8 585 await this.destroyWorker(workerNode.worker)
920278a2 586 await workerExitPromise
1fbcaa7c
JB
587 })
588 )
c97c7edb
S
589 }
590
4a6952ff 591 /**
6c6afb84 592 * Terminates the given worker.
4a6952ff 593 *
f06e48d8 594 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
595 */
596 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 597
729c563d 598 /**
6677a3d3
JB
599 * Setup hook to execute code before worker nodes are created in the abstract constructor.
600 * Can be overridden.
afc003b2
JB
601 *
602 * @virtual
729c563d 603 */
280c2a77 604 protected setupHook (): void {
d99ba5a8 605 // Intentionally empty
280c2a77 606 }
c97c7edb 607
729c563d 608 /**
280c2a77
S
609 * Should return whether the worker is the main worker or not.
610 */
611 protected abstract isMain (): boolean
612
613 /**
2e81254d 614 * Hook executed before the worker task execution.
bf9549ae 615 * Can be overridden.
729c563d 616 *
f06e48d8 617 * @param workerNodeKey - The worker node key.
1c6fe997 618 * @param task - The task to execute.
729c563d 619 */
1c6fe997
JB
620 protected beforeTaskExecutionHook (
621 workerNodeKey: number,
622 task: Task<Data>
623 ): void {
f59e1027 624 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
625 ++workerUsage.tasks.executing
626 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
627 }
628
c01733f1 629 /**
2e81254d 630 * Hook executed after the worker task execution.
bf9549ae 631 * Can be overridden.
c01733f1 632 *
c923ce56 633 * @param worker - The worker.
38e795c1 634 * @param message - The received message.
c01733f1 635 */
2e81254d 636 protected afterTaskExecutionHook (
c923ce56 637 worker: Worker,
2740a743 638 message: MessageValue<Response>
bf9549ae 639 ): void {
f59e1027 640 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
641 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
642 this.updateRunTimeWorkerUsage(workerUsage, message)
643 this.updateEluWorkerUsage(workerUsage, message)
644 }
645
646 private updateTaskStatisticsWorkerUsage (
647 workerUsage: WorkerUsage,
648 message: MessageValue<Response>
649 ): void {
a4e07f72
JB
650 const workerTaskStatistics = workerUsage.tasks
651 --workerTaskStatistics.executing
98e72cda
JB
652 if (message.taskError == null) {
653 ++workerTaskStatistics.executed
654 } else {
a4e07f72 655 ++workerTaskStatistics.failed
2740a743 656 }
f8eb0a2a
JB
657 }
658
a4e07f72
JB
659 private updateRunTimeWorkerUsage (
660 workerUsage: WorkerUsage,
f8eb0a2a
JB
661 message: MessageValue<Response>
662 ): void {
87de9ff5
JB
663 if (
664 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 665 .aggregate
87de9ff5 666 ) {
f7510105 667 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
668 workerUsage.runTime.aggregate =
669 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
670 workerUsage.runTime.minimum = Math.min(
671 taskRunTime,
71514351 672 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
673 )
674 workerUsage.runTime.maximum = Math.max(
675 taskRunTime,
71514351 676 workerUsage.runTime?.maximum ?? -Infinity
f7510105 677 )
c6bd2650 678 if (
932fc8be
JB
679 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
680 .average &&
a4e07f72 681 workerUsage.tasks.executed !== 0
c6bd2650 682 ) {
a4e07f72 683 workerUsage.runTime.average =
98e72cda 684 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 685 }
3fa4cdd2 686 if (
932fc8be
JB
687 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
688 .median &&
d715b7bc 689 message.taskPerformance?.runTime != null
3fa4cdd2 690 ) {
a4e07f72
JB
691 workerUsage.runTime.history.push(message.taskPerformance.runTime)
692 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 693 }
3032893a 694 }
f8eb0a2a
JB
695 }
696
a4e07f72
JB
697 private updateWaitTimeWorkerUsage (
698 workerUsage: WorkerUsage,
1c6fe997 699 task: Task<Data>
f8eb0a2a 700 ): void {
1c6fe997
JB
701 const timestamp = performance.now()
702 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
703 if (
704 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 705 .aggregate
87de9ff5 706 ) {
71514351
JB
707 workerUsage.waitTime.aggregate =
708 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
709 workerUsage.waitTime.minimum = Math.min(
710 taskWaitTime,
71514351 711 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
712 )
713 workerUsage.waitTime.maximum = Math.max(
714 taskWaitTime,
71514351 715 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 716 )
09a6305f 717 if (
87de9ff5 718 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 719 .waitTime.average &&
a4e07f72 720 workerUsage.tasks.executed !== 0
09a6305f 721 ) {
a4e07f72 722 workerUsage.waitTime.average =
98e72cda 723 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
724 }
725 if (
87de9ff5 726 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 727 .waitTime.median
09a6305f 728 ) {
1c6fe997 729 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 730 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 731 }
0567595a 732 }
c01733f1 733 }
734
a4e07f72 735 private updateEluWorkerUsage (
5df69fab 736 workerUsage: WorkerUsage,
62c15a68
JB
737 message: MessageValue<Response>
738 ): void {
5df69fab
JB
739 if (
740 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
741 .aggregate
742 ) {
f7510105 743 if (message.taskPerformance?.elu != null) {
71514351
JB
744 workerUsage.elu.idle.aggregate =
745 (workerUsage.elu.idle?.aggregate ?? 0) +
746 message.taskPerformance.elu.idle
747 workerUsage.elu.active.aggregate =
748 (workerUsage.elu.active?.aggregate ?? 0) +
749 message.taskPerformance.elu.active
f7510105
JB
750 if (workerUsage.elu.utilization != null) {
751 workerUsage.elu.utilization =
752 (workerUsage.elu.utilization +
753 message.taskPerformance.elu.utilization) /
754 2
755 } else {
756 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
757 }
758 workerUsage.elu.idle.minimum = Math.min(
759 message.taskPerformance.elu.idle,
71514351 760 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
761 )
762 workerUsage.elu.idle.maximum = Math.max(
763 message.taskPerformance.elu.idle,
71514351 764 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
765 )
766 workerUsage.elu.active.minimum = Math.min(
767 message.taskPerformance.elu.active,
71514351 768 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
769 )
770 workerUsage.elu.active.maximum = Math.max(
771 message.taskPerformance.elu.active,
71514351 772 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 773 )
71514351
JB
774 if (
775 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
776 .average &&
777 workerUsage.tasks.executed !== 0
778 ) {
71514351 779 workerUsage.elu.idle.average =
98e72cda 780 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 781 workerUsage.elu.active.average =
98e72cda 782 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
783 }
784 if (
785 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 786 .median
71514351
JB
787 ) {
788 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
789 workerUsage.elu.active.history.push(
790 message.taskPerformance.elu.active
791 )
792 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
793 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
794 }
62c15a68
JB
795 }
796 }
797 }
798
280c2a77 799 /**
f06e48d8 800 * Chooses a worker node for the next task.
280c2a77 801 *
6c6afb84 802 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 803 *
20dcad1a 804 * @returns The worker node key
280c2a77 805 */
6c6afb84 806 private chooseWorkerNode (): number {
930dcf12 807 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
808 const worker = this.createAndSetupDynamicWorker()
809 if (
810 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
811 ) {
812 return this.getWorkerNodeKey(worker)
813 }
17393ac8 814 }
930dcf12
JB
815 return this.workerChoiceStrategyContext.execute()
816 }
817
6c6afb84
JB
818 /**
819 * Conditions for dynamic worker creation.
820 *
821 * @returns Whether to create a dynamic worker or not.
822 */
823 private shallCreateDynamicWorker (): boolean {
930dcf12 824 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
825 }
826
280c2a77 827 /**
675bb809 828 * Sends a message to the given worker.
280c2a77 829 *
38e795c1
JB
830 * @param worker - The worker which should receive the message.
831 * @param message - The message.
280c2a77
S
832 */
833 protected abstract sendToWorker (
834 worker: Worker,
835 message: MessageValue<Data>
836 ): void
837
4a6952ff 838 /**
f06e48d8 839 * Registers a listener callback on the given worker.
4a6952ff 840 *
38e795c1
JB
841 * @param worker - The worker which should register a listener.
842 * @param listener - The message listener callback.
4a6952ff 843 */
e102732c
JB
844 private registerWorkerMessageListener<Message extends Data | Response>(
845 worker: Worker,
846 listener: (message: MessageValue<Message>) => void
847 ): void {
848 worker.on('message', listener as MessageHandler<Worker>)
849 }
c97c7edb 850
729c563d 851 /**
41344292 852 * Creates a new worker.
6c6afb84
JB
853 *
854 * @returns Newly created worker.
729c563d 855 */
280c2a77 856 protected abstract createWorker (): Worker
c97c7edb 857
729c563d 858 /**
f06e48d8 859 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 860 * Can be overridden.
729c563d 861 *
38e795c1 862 * @param worker - The newly created worker.
729c563d 863 */
6677a3d3 864 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
865 // Listen to worker messages.
866 this.registerWorkerMessageListener(worker, this.workerListener())
867 }
c97c7edb 868
4a6952ff 869 /**
f06e48d8 870 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
871 *
872 * @returns New, completely set up worker.
873 */
874 protected createAndSetupWorker (): Worker {
bdacc2d2 875 const worker = this.createWorker()
280c2a77 876
35cf1c03 877 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 878 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
879 worker.on('error', error => {
880 if (this.emitter != null) {
881 this.emitter.emit(PoolEvents.error, error)
882 }
5bc91f3e 883 if (this.opts.enableTasksQueue === true) {
7d2e3f59 884 this.redistributeQueuedTasks(worker)
5bc91f3e 885 }
e8a4c3ea 886 if (this.opts.restartWorkerOnError === true) {
8a1260a3
JB
887 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
888 this.createAndSetupDynamicWorker()
889 } else {
890 this.createAndSetupWorker()
891 }
5baee0d7
JB
892 }
893 })
a35560ba
S
894 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
895 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 896 worker.once('exit', () => {
f06e48d8 897 this.removeWorkerNode(worker)
a974afa6 898 })
280c2a77 899
f06e48d8 900 this.pushWorkerNode(worker)
280c2a77 901
b6b32453
JB
902 this.setWorkerStatistics(worker)
903
280c2a77
S
904 this.afterWorkerSetup(worker)
905
c97c7edb
S
906 return worker
907 }
be0676b3 908
7d2e3f59
JB
909 private redistributeQueuedTasks (worker: Worker): void {
910 const workerNodeKey = this.getWorkerNodeKey(worker)
911 while (this.tasksQueueSize(workerNodeKey) > 0) {
912 let targetWorkerNodeKey: number = workerNodeKey
913 let minQueuedTasks = Infinity
914 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
915 if (
916 workerNodeId !== workerNodeKey &&
917 workerNode.usage.tasks.queued === 0
918 ) {
919 targetWorkerNodeKey = workerNodeId
920 break
921 }
922 if (
923 workerNodeId !== workerNodeKey &&
924 workerNode.usage.tasks.queued < minQueuedTasks
925 ) {
926 minQueuedTasks = workerNode.usage.tasks.queued
927 targetWorkerNodeKey = workerNodeId
928 }
929 }
930 this.enqueueTask(
931 targetWorkerNodeKey,
932 this.dequeueTask(workerNodeKey) as Task<Data>
933 )
934 }
935 }
936
930dcf12
JB
937 /**
938 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
939 *
940 * @returns New, completely set up dynamic worker.
941 */
942 protected createAndSetupDynamicWorker (): Worker {
943 const worker = this.createAndSetupWorker()
8a1260a3 944 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
930dcf12 945 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 946 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
947 if (
948 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
949 (message.kill != null &&
950 ((this.opts.enableTasksQueue === false &&
f59e1027 951 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 952 (this.opts.enableTasksQueue === true &&
f59e1027 953 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 954 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
955 ) {
956 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
957 void (this.destroyWorker(worker) as Promise<void>)
958 }
959 })
75d3401a 960 this.sendToWorker(worker, { dynamic: true })
930dcf12
JB
961 return worker
962 }
963
be0676b3 964 /**
ff733df7 965 * This function is the listener registered for each worker message.
be0676b3 966 *
bdacc2d2 967 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
968 */
969 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 970 return message => {
f59e1027
JB
971 if (message.workerId != null && message.started != null) {
972 // Worker started message received
6b272951 973 this.handleWorkerStartedMessage(message)
f59e1027 974 } else if (message.id != null) {
a3445496 975 // Task execution response received
6b272951
JB
976 this.handleTaskExecutionResponse(message)
977 }
978 }
979 }
980
981 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
982 // Worker started message received
983 const worker = this.getWorkerById(message.workerId as number)
984 if (worker != null) {
985 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
986 message.started as boolean
987 } else {
988 throw new Error(
989 `Worker started message received from unknown worker '${
990 message.workerId as number
991 }'`
992 )
993 }
994 }
995
996 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
997 const promiseResponse = this.promiseResponseMap.get(message.id as string)
998 if (promiseResponse != null) {
999 if (message.taskError != null) {
1000 if (this.emitter != null) {
1001 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1002 }
6b272951
JB
1003 promiseResponse.reject(message.taskError.message)
1004 } else {
1005 promiseResponse.resolve(message.data as Response)
1006 }
1007 this.afterTaskExecutionHook(promiseResponse.worker, message)
1008 this.promiseResponseMap.delete(message.id as string)
1009 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1010 if (
1011 this.opts.enableTasksQueue === true &&
1012 this.tasksQueueSize(workerNodeKey) > 0
1013 ) {
1014 this.executeTask(
1015 workerNodeKey,
1016 this.dequeueTask(workerNodeKey) as Task<Data>
1017 )
be0676b3 1018 }
6b272951 1019 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1020 }
be0676b3 1021 }
7c0ba920 1022
ff733df7 1023 private checkAndEmitEvents (): void {
1f68cede 1024 if (this.emitter != null) {
ff733df7 1025 if (this.busy) {
2845f2a5 1026 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1027 }
6b27d407 1028 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1029 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1030 }
164d950a
JB
1031 }
1032 }
1033
8a1260a3
JB
1034 /**
1035 * Gets the worker information.
1036 *
1037 * @param workerNodeKey - The worker node key.
1038 */
1039 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1040 return this.workerNodes[workerNodeKey].info
1041 }
1042
a05c10de 1043 /**
f06e48d8 1044 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1045 *
38e795c1 1046 * @param worker - The worker.
f06e48d8 1047 * @returns The worker nodes length.
ea7a90d3 1048 */
f06e48d8 1049 private pushWorkerNode (worker: Worker): number {
4b628b48 1050 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1051 }
c923ce56 1052
51fe3d3c 1053 /**
f06e48d8 1054 * Removes the given worker from the pool worker nodes.
51fe3d3c 1055 *
f06e48d8 1056 * @param worker - The worker.
51fe3d3c 1057 */
416fd65c 1058 private removeWorkerNode (worker: Worker): void {
f06e48d8 1059 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1060 if (workerNodeKey !== -1) {
1061 this.workerNodes.splice(workerNodeKey, 1)
1062 this.workerChoiceStrategyContext.remove(workerNodeKey)
1063 }
51fe3d3c 1064 }
adc3c320 1065
2e81254d 1066 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1067 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1068 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1069 }
1070
f9f00b5f 1071 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1072 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1073 }
1074
416fd65c 1075 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1076 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1077 }
1078
416fd65c 1079 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1080 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1081 }
1082
416fd65c 1083 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1084 while (this.tasksQueueSize(workerNodeKey) > 0) {
1085 this.executeTask(
1086 workerNodeKey,
1087 this.dequeueTask(workerNodeKey) as Task<Data>
1088 )
ff733df7 1089 }
4b628b48 1090 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1091 }
1092
ef41a6e6
JB
1093 private flushTasksQueues (): void {
1094 for (const [workerNodeKey] of this.workerNodes.entries()) {
1095 this.flushTasksQueue(workerNodeKey)
1096 }
1097 }
b6b32453
JB
1098
1099 private setWorkerStatistics (worker: Worker): void {
1100 this.sendToWorker(worker, {
1101 statistics: {
87de9ff5
JB
1102 runTime:
1103 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1104 .runTime.aggregate,
87de9ff5 1105 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1106 .elu.aggregate
b6b32453
JB
1107 }
1108 })
1109 }
c97c7edb 1110}