test: add test for isAsyncFunction() helper
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
a35560ba 34import {
f0d7f803 35 Measurements,
a35560ba 36 WorkerChoiceStrategies,
a20f0ba5
JB
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
bdaf31cd
JB
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
f06e48d8 55 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
156 throw new Error('Cannot instantiate a fixed pool with no worker')
157 }
158 }
159
7c0ba920 160 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
161 if (isPlainObject(opts)) {
162 this.opts.workerChoiceStrategy =
163 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
164 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
165 this.opts.workerChoiceStrategyOptions =
166 opts.workerChoiceStrategyOptions ??
167 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
168 this.checkValidWorkerChoiceStrategyOptions(
169 this.opts.workerChoiceStrategyOptions
170 )
1f68cede 171 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
172 this.opts.enableEvents = opts.enableEvents ?? true
173 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
174 if (this.opts.enableTasksQueue) {
175 this.checkValidTasksQueueOptions(
176 opts.tasksQueueOptions as TasksQueueOptions
177 )
178 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
179 opts.tasksQueueOptions as TasksQueueOptions
180 )
181 }
182 } else {
183 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 184 }
aee46736
JB
185 }
186
187 private checkValidWorkerChoiceStrategy (
188 workerChoiceStrategy: WorkerChoiceStrategy
189 ): void {
190 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 191 throw new Error(
aee46736 192 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
193 )
194 }
7c0ba920
JB
195 }
196
0d80593b
JB
197 private checkValidWorkerChoiceStrategyOptions (
198 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
199 ): void {
200 if (!isPlainObject(workerChoiceStrategyOptions)) {
201 throw new TypeError(
202 'Invalid worker choice strategy options: must be a plain object'
203 )
204 }
49be33fe
JB
205 if (
206 workerChoiceStrategyOptions.weights != null &&
6b27d407 207 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
208 ) {
209 throw new Error(
210 'Invalid worker choice strategy options: must have a weight for each worker node'
211 )
212 }
f0d7f803
JB
213 if (
214 workerChoiceStrategyOptions.measurement != null &&
215 !Object.values(Measurements).includes(
216 workerChoiceStrategyOptions.measurement
217 )
218 ) {
219 throw new Error(
220 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
221 )
222 }
0d80593b
JB
223 }
224
a20f0ba5
JB
225 private checkValidTasksQueueOptions (
226 tasksQueueOptions: TasksQueueOptions
227 ): void {
0d80593b
JB
228 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
229 throw new TypeError('Invalid tasks queue options: must be a plain object')
230 }
f0d7f803
JB
231 if (
232 tasksQueueOptions?.concurrency != null &&
233 !Number.isSafeInteger(tasksQueueOptions.concurrency)
234 ) {
235 throw new TypeError(
236 'Invalid worker tasks concurrency: must be an integer'
237 )
238 }
239 if (
240 tasksQueueOptions?.concurrency != null &&
241 tasksQueueOptions.concurrency <= 0
242 ) {
a20f0ba5 243 throw new Error(
f0d7f803 244 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
245 )
246 }
247 }
248
08f3f44c 249 /** @inheritDoc */
6b27d407
JB
250 public get info (): PoolInfo {
251 return {
252 type: this.type,
184855e6 253 worker: this.worker,
6b27d407
JB
254 minSize: this.minSize,
255 maxSize: this.maxSize,
c05f0d50
JB
256 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
257 .runTime.aggregate &&
1305e9a8
JB
258 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
260 workerNodes: this.workerNodes.length,
261 idleWorkerNodes: this.workerNodes.reduce(
262 (accumulator, workerNode) =>
f59e1027 263 workerNode.usage.tasks.executing === 0
a4e07f72
JB
264 ? accumulator + 1
265 : accumulator,
6b27d407
JB
266 0
267 ),
268 busyWorkerNodes: this.workerNodes.reduce(
269 (accumulator, workerNode) =>
f59e1027 270 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
271 0
272 ),
a4e07f72 273 executedTasks: this.workerNodes.reduce(
6b27d407 274 (accumulator, workerNode) =>
f59e1027 275 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
276 0
277 ),
278 executingTasks: this.workerNodes.reduce(
279 (accumulator, workerNode) =>
f59e1027 280 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
281 0
282 ),
283 queuedTasks: this.workerNodes.reduce(
df593701 284 (accumulator, workerNode) =>
f59e1027 285 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
286 0
287 ),
288 maxQueuedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
f59e1027 290 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 291 0
a4e07f72
JB
292 ),
293 failedTasks: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 accumulator + workerNode.usage.tasks.failed,
a4e07f72 296 0
6b27d407
JB
297 )
298 }
299 }
08f3f44c 300
afe0d5bf
JB
301 /**
302 * Gets the pool run time.
303 *
304 * @returns The pool run time in milliseconds.
305 */
306 private get runTime (): number {
307 return performance.now() - this.startTimestamp
308 }
309
310 /**
311 * Gets the approximate pool utilization.
312 *
313 * @returns The pool utilization.
314 */
315 private get utilization (): number {
316 const poolRunTimeCapacity = this.runTime * this.maxSize
317 const totalTasksRunTime = this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.runTime.aggregate,
320 0
321 )
322 const totalTasksWaitTime = this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 accumulator + workerNode.usage.waitTime.aggregate,
325 0
326 )
327 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
328 }
329
8881ae32
JB
330 /**
331 * Pool type.
332 *
333 * If it is `'dynamic'`, it provides the `max` property.
334 */
335 protected abstract get type (): PoolType
336
184855e6
JB
337 /**
338 * Gets the worker type.
339 */
340 protected abstract get worker (): WorkerType
341
c2ade475 342 /**
6b27d407 343 * Pool minimum size.
c2ade475 344 */
6b27d407 345 protected abstract get minSize (): number
ff733df7
JB
346
347 /**
6b27d407 348 * Pool maximum size.
ff733df7 349 */
6b27d407 350 protected abstract get maxSize (): number
a35560ba 351
f59e1027
JB
352 /**
353 * Get the worker given its id.
354 *
355 * @param workerId - The worker id.
356 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
357 */
358 private getWorkerById (workerId: number): Worker | undefined {
359 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
360 ?.worker
361 }
362
ffcbbad8 363 /**
f06e48d8 364 * Gets the given worker its worker node key.
ffcbbad8
JB
365 *
366 * @param worker - The worker.
f59e1027 367 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 368 */
f06e48d8
JB
369 private getWorkerNodeKey (worker: Worker): number {
370 return this.workerNodes.findIndex(
371 workerNode => workerNode.worker === worker
372 )
bf9549ae
JB
373 }
374
afc003b2 375 /** @inheritDoc */
a35560ba 376 public setWorkerChoiceStrategy (
59219cbb
JB
377 workerChoiceStrategy: WorkerChoiceStrategy,
378 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 379 ): void {
aee46736 380 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 381 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
382 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
383 this.opts.workerChoiceStrategy
384 )
385 if (workerChoiceStrategyOptions != null) {
386 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
387 }
9c16fb4b 388 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
389 this.setWorkerNodeTasksUsage(
390 workerNode,
9c16fb4b 391 this.getWorkerUsage(workerNodeKey)
8604aaab 392 )
b6b32453 393 this.setWorkerStatistics(workerNode.worker)
59219cbb 394 }
a20f0ba5
JB
395 }
396
397 /** @inheritDoc */
398 public setWorkerChoiceStrategyOptions (
399 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
400 ): void {
0d80593b 401 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
402 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
403 this.workerChoiceStrategyContext.setOptions(
404 this.opts.workerChoiceStrategyOptions
a35560ba
S
405 )
406 }
407
a20f0ba5 408 /** @inheritDoc */
8f52842f
JB
409 public enableTasksQueue (
410 enable: boolean,
411 tasksQueueOptions?: TasksQueueOptions
412 ): void {
a20f0ba5 413 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 414 this.flushTasksQueues()
a20f0ba5
JB
415 }
416 this.opts.enableTasksQueue = enable
8f52842f 417 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
418 }
419
420 /** @inheritDoc */
8f52842f 421 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 422 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
423 this.checkValidTasksQueueOptions(tasksQueueOptions)
424 this.opts.tasksQueueOptions =
425 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 426 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
427 delete this.opts.tasksQueueOptions
428 }
429 }
430
431 private buildTasksQueueOptions (
432 tasksQueueOptions: TasksQueueOptions
433 ): TasksQueueOptions {
434 return {
435 concurrency: tasksQueueOptions?.concurrency ?? 1
436 }
437 }
438
c319c66b
JB
439 /**
440 * Whether the pool is full or not.
441 *
442 * The pool filling boolean status.
443 */
dea903a8
JB
444 protected get full (): boolean {
445 return this.workerNodes.length >= this.maxSize
446 }
c2ade475 447
c319c66b
JB
448 /**
449 * Whether the pool is busy or not.
450 *
451 * The pool busyness boolean status.
452 */
453 protected abstract get busy (): boolean
7c0ba920 454
6c6afb84
JB
455 /**
456 * Whether worker nodes are executing at least one task.
457 *
458 * @returns Worker nodes busyness boolean status.
459 */
c2ade475 460 protected internalBusy (): boolean {
e0ae6100
JB
461 return (
462 this.workerNodes.findIndex(workerNode => {
f59e1027 463 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
464 }) === -1
465 )
cb70b19d
JB
466 }
467
afc003b2 468 /** @inheritDoc */
a86b6df1 469 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 470 const timestamp = performance.now()
20dcad1a 471 const workerNodeKey = this.chooseWorkerNode()
adc3c320 472 const submittedTask: Task<Data> = {
a86b6df1 473 name,
e5a5c0fc
JB
474 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
475 data: data ?? ({} as Data),
b6b32453 476 timestamp,
adc3c320
JB
477 id: crypto.randomUUID()
478 }
2e81254d 479 const res = new Promise<Response>((resolve, reject) => {
02706357 480 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
481 resolve,
482 reject,
20dcad1a 483 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
484 })
485 })
ff733df7
JB
486 if (
487 this.opts.enableTasksQueue === true &&
7171d33f 488 (this.busy ||
f59e1027 489 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 490 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 491 .concurrency as number))
ff733df7 492 ) {
26a929d7
JB
493 this.enqueueTask(workerNodeKey, submittedTask)
494 } else {
2e81254d 495 this.executeTask(workerNodeKey, submittedTask)
adc3c320 496 }
ff733df7 497 this.checkAndEmitEvents()
78cea37e 498 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
499 return res
500 }
c97c7edb 501
afc003b2 502 /** @inheritDoc */
c97c7edb 503 public async destroy (): Promise<void> {
1fbcaa7c 504 await Promise.all(
875a7c37
JB
505 this.workerNodes.map(async (workerNode, workerNodeKey) => {
506 this.flushTasksQueue(workerNodeKey)
47aacbaa 507 // FIXME: wait for tasks to be finished
920278a2
JB
508 const workerExitPromise = new Promise<void>(resolve => {
509 workerNode.worker.on('exit', () => {
510 resolve()
511 })
512 })
f06e48d8 513 await this.destroyWorker(workerNode.worker)
920278a2 514 await workerExitPromise
1fbcaa7c
JB
515 })
516 )
c97c7edb
S
517 }
518
4a6952ff 519 /**
6c6afb84 520 * Terminates the given worker.
4a6952ff 521 *
f06e48d8 522 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
523 */
524 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 525
729c563d 526 /**
6677a3d3
JB
527 * Setup hook to execute code before worker nodes are created in the abstract constructor.
528 * Can be overridden.
afc003b2
JB
529 *
530 * @virtual
729c563d 531 */
280c2a77 532 protected setupHook (): void {
d99ba5a8 533 // Intentionally empty
280c2a77 534 }
c97c7edb 535
729c563d 536 /**
280c2a77
S
537 * Should return whether the worker is the main worker or not.
538 */
539 protected abstract isMain (): boolean
540
541 /**
2e81254d 542 * Hook executed before the worker task execution.
bf9549ae 543 * Can be overridden.
729c563d 544 *
f06e48d8 545 * @param workerNodeKey - The worker node key.
1c6fe997 546 * @param task - The task to execute.
729c563d 547 */
1c6fe997
JB
548 protected beforeTaskExecutionHook (
549 workerNodeKey: number,
550 task: Task<Data>
551 ): void {
f59e1027 552 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
553 ++workerUsage.tasks.executing
554 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
555 }
556
c01733f1 557 /**
2e81254d 558 * Hook executed after the worker task execution.
bf9549ae 559 * Can be overridden.
c01733f1 560 *
c923ce56 561 * @param worker - The worker.
38e795c1 562 * @param message - The received message.
c01733f1 563 */
2e81254d 564 protected afterTaskExecutionHook (
c923ce56 565 worker: Worker,
2740a743 566 message: MessageValue<Response>
bf9549ae 567 ): void {
f59e1027 568 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
569 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
570 this.updateRunTimeWorkerUsage(workerUsage, message)
571 this.updateEluWorkerUsage(workerUsage, message)
572 }
573
574 private updateTaskStatisticsWorkerUsage (
575 workerUsage: WorkerUsage,
576 message: MessageValue<Response>
577 ): void {
a4e07f72
JB
578 const workerTaskStatistics = workerUsage.tasks
579 --workerTaskStatistics.executing
580 ++workerTaskStatistics.executed
82f36766 581 if (message.taskError != null) {
a4e07f72 582 ++workerTaskStatistics.failed
2740a743 583 }
f8eb0a2a
JB
584 }
585
a4e07f72
JB
586 private updateRunTimeWorkerUsage (
587 workerUsage: WorkerUsage,
f8eb0a2a
JB
588 message: MessageValue<Response>
589 ): void {
87de9ff5
JB
590 if (
591 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 592 .aggregate
87de9ff5 593 ) {
932fc8be 594 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 595 if (
932fc8be
JB
596 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
597 .average &&
a4e07f72 598 workerUsage.tasks.executed !== 0
c6bd2650 599 ) {
a4e07f72 600 workerUsage.runTime.average =
f1c06930
JB
601 workerUsage.runTime.aggregate /
602 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 603 }
3fa4cdd2 604 if (
932fc8be
JB
605 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
606 .median &&
d715b7bc 607 message.taskPerformance?.runTime != null
3fa4cdd2 608 ) {
a4e07f72
JB
609 workerUsage.runTime.history.push(message.taskPerformance.runTime)
610 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 611 }
3032893a 612 }
f8eb0a2a
JB
613 }
614
a4e07f72
JB
615 private updateWaitTimeWorkerUsage (
616 workerUsage: WorkerUsage,
1c6fe997 617 task: Task<Data>
f8eb0a2a 618 ): void {
1c6fe997
JB
619 const timestamp = performance.now()
620 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
621 if (
622 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 623 .aggregate
87de9ff5 624 ) {
932fc8be 625 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 626 if (
87de9ff5 627 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 628 .waitTime.average &&
a4e07f72 629 workerUsage.tasks.executed !== 0
09a6305f 630 ) {
a4e07f72 631 workerUsage.waitTime.average =
f1c06930
JB
632 workerUsage.waitTime.aggregate /
633 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
634 }
635 if (
87de9ff5 636 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 637 .waitTime.median &&
1c6fe997 638 taskWaitTime != null
09a6305f 639 ) {
1c6fe997 640 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 641 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 642 }
0567595a 643 }
c01733f1 644 }
645
a4e07f72 646 private updateEluWorkerUsage (
5df69fab 647 workerUsage: WorkerUsage,
62c15a68
JB
648 message: MessageValue<Response>
649 ): void {
5df69fab
JB
650 if (
651 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
652 .aggregate
653 ) {
654 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
655 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
656 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
657 workerUsage.elu.utilization =
658 (workerUsage.elu.utilization +
659 message.taskPerformance.elu.utilization) /
660 2
661 } else if (message.taskPerformance?.elu != null) {
662 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
663 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
664 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
665 }
d715b7bc 666 if (
5df69fab
JB
667 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
668 .average &&
669 workerUsage.tasks.executed !== 0
670 ) {
f1c06930
JB
671 const executedTasks =
672 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 673 workerUsage.elu.idle.average =
f1c06930 674 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 675 workerUsage.elu.active.average =
f1c06930 676 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
677 }
678 if (
679 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
680 .median &&
d715b7bc
JB
681 message.taskPerformance?.elu != null
682 ) {
5df69fab
JB
683 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
684 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
685 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
686 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
687 }
688 }
689 }
690
280c2a77 691 /**
f06e48d8 692 * Chooses a worker node for the next task.
280c2a77 693 *
6c6afb84 694 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 695 *
20dcad1a 696 * @returns The worker node key
280c2a77 697 */
6c6afb84 698 private chooseWorkerNode (): number {
930dcf12 699 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
700 const worker = this.createAndSetupDynamicWorker()
701 if (
702 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
703 ) {
704 return this.getWorkerNodeKey(worker)
705 }
17393ac8 706 }
930dcf12
JB
707 return this.workerChoiceStrategyContext.execute()
708 }
709
6c6afb84
JB
710 /**
711 * Conditions for dynamic worker creation.
712 *
713 * @returns Whether to create a dynamic worker or not.
714 */
715 private shallCreateDynamicWorker (): boolean {
930dcf12 716 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
717 }
718
280c2a77 719 /**
675bb809 720 * Sends a message to the given worker.
280c2a77 721 *
38e795c1
JB
722 * @param worker - The worker which should receive the message.
723 * @param message - The message.
280c2a77
S
724 */
725 protected abstract sendToWorker (
726 worker: Worker,
727 message: MessageValue<Data>
728 ): void
729
4a6952ff 730 /**
f06e48d8 731 * Registers a listener callback on the given worker.
4a6952ff 732 *
38e795c1
JB
733 * @param worker - The worker which should register a listener.
734 * @param listener - The message listener callback.
4a6952ff 735 */
e102732c
JB
736 private registerWorkerMessageListener<Message extends Data | Response>(
737 worker: Worker,
738 listener: (message: MessageValue<Message>) => void
739 ): void {
740 worker.on('message', listener as MessageHandler<Worker>)
741 }
c97c7edb 742
729c563d 743 /**
41344292 744 * Creates a new worker.
6c6afb84
JB
745 *
746 * @returns Newly created worker.
729c563d 747 */
280c2a77 748 protected abstract createWorker (): Worker
c97c7edb 749
729c563d 750 /**
f06e48d8 751 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 752 * Can be overridden.
729c563d 753 *
38e795c1 754 * @param worker - The newly created worker.
729c563d 755 */
6677a3d3 756 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
757 // Listen to worker messages.
758 this.registerWorkerMessageListener(worker, this.workerListener())
759 }
c97c7edb 760
4a6952ff 761 /**
f06e48d8 762 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
763 *
764 * @returns New, completely set up worker.
765 */
766 protected createAndSetupWorker (): Worker {
bdacc2d2 767 const worker = this.createWorker()
280c2a77 768
35cf1c03 769 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 770 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
771 worker.on('error', error => {
772 if (this.emitter != null) {
773 this.emitter.emit(PoolEvents.error, error)
774 }
5bc91f3e
JB
775 if (this.opts.enableTasksQueue === true) {
776 const workerNodeKey = this.getWorkerNodeKey(worker)
777 while (this.tasksQueueSize(workerNodeKey) > 0) {
778 let targetWorkerNodeKey: number = workerNodeKey
779 let minQueuedTasks = Infinity
780 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
781 if (
782 workerNodeId !== workerNodeKey &&
783 workerNode.usage.tasks.queued === 0
784 ) {
785 targetWorkerNodeKey = workerNodeId
786 break
787 }
788 if (
789 workerNodeId !== workerNodeKey &&
790 workerNode.usage.tasks.queued < minQueuedTasks
791 ) {
792 minQueuedTasks = workerNode.usage.tasks.queued
793 targetWorkerNodeKey = workerNodeId
794 }
795 }
796 this.enqueueTask(
797 targetWorkerNodeKey,
798 this.dequeueTask(workerNodeKey) as Task<Data>
799 )
800 }
801 }
e8a4c3ea 802 if (this.opts.restartWorkerOnError === true) {
1f68cede 803 this.createAndSetupWorker()
5baee0d7
JB
804 }
805 })
a35560ba
S
806 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
807 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 808 worker.once('exit', () => {
f06e48d8 809 this.removeWorkerNode(worker)
a974afa6 810 })
280c2a77 811
f06e48d8 812 this.pushWorkerNode(worker)
280c2a77 813
b6b32453
JB
814 this.setWorkerStatistics(worker)
815
280c2a77
S
816 this.afterWorkerSetup(worker)
817
c97c7edb
S
818 return worker
819 }
be0676b3 820
930dcf12
JB
821 /**
822 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
823 *
824 * @returns New, completely set up dynamic worker.
825 */
826 protected createAndSetupDynamicWorker (): Worker {
827 const worker = this.createAndSetupWorker()
828 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 829 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
830 if (
831 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
832 (message.kill != null &&
833 ((this.opts.enableTasksQueue === false &&
f59e1027 834 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 835 (this.opts.enableTasksQueue === true &&
f59e1027 836 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 837 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
838 ) {
839 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
840 void (this.destroyWorker(worker) as Promise<void>)
841 }
842 })
843 return worker
844 }
845
be0676b3 846 /**
ff733df7 847 * This function is the listener registered for each worker message.
be0676b3 848 *
bdacc2d2 849 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
850 */
851 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 852 return message => {
f59e1027
JB
853 if (message.workerId != null && message.started != null) {
854 // Worker started message received
6b272951 855 this.handleWorkerStartedMessage(message)
f59e1027 856 } else if (message.id != null) {
a3445496 857 // Task execution response received
6b272951
JB
858 this.handleTaskExecutionResponse(message)
859 }
860 }
861 }
862
863 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
864 // Worker started message received
865 const worker = this.getWorkerById(message.workerId as number)
866 if (worker != null) {
867 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
868 message.started as boolean
869 } else {
870 throw new Error(
871 `Worker started message received from unknown worker '${
872 message.workerId as number
873 }'`
874 )
875 }
876 }
877
878 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
879 const promiseResponse = this.promiseResponseMap.get(message.id as string)
880 if (promiseResponse != null) {
881 if (message.taskError != null) {
882 if (this.emitter != null) {
883 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 884 }
6b272951
JB
885 promiseResponse.reject(message.taskError.message)
886 } else {
887 promiseResponse.resolve(message.data as Response)
888 }
889 this.afterTaskExecutionHook(promiseResponse.worker, message)
890 this.promiseResponseMap.delete(message.id as string)
891 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
892 if (
893 this.opts.enableTasksQueue === true &&
894 this.tasksQueueSize(workerNodeKey) > 0
895 ) {
896 this.executeTask(
897 workerNodeKey,
898 this.dequeueTask(workerNodeKey) as Task<Data>
899 )
be0676b3 900 }
6b272951 901 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 902 }
be0676b3 903 }
7c0ba920 904
ff733df7 905 private checkAndEmitEvents (): void {
1f68cede 906 if (this.emitter != null) {
ff733df7 907 if (this.busy) {
6b27d407 908 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 909 }
6b27d407
JB
910 if (this.type === PoolTypes.dynamic && this.full) {
911 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 912 }
164d950a
JB
913 }
914 }
915
0ebe2a9f
JB
916 /**
917 * Sets the given worker node its tasks usage in the pool.
918 *
919 * @param workerNode - The worker node.
a4e07f72 920 * @param workerUsage - The worker usage.
0ebe2a9f
JB
921 */
922 private setWorkerNodeTasksUsage (
923 workerNode: WorkerNode<Worker, Data>,
a4e07f72 924 workerUsage: WorkerUsage
0ebe2a9f 925 ): void {
f59e1027 926 workerNode.usage = workerUsage
0ebe2a9f
JB
927 }
928
a05c10de 929 /**
f06e48d8 930 * Pushes the given worker in the pool worker nodes.
ea7a90d3 931 *
38e795c1 932 * @param worker - The worker.
f06e48d8 933 * @returns The worker nodes length.
ea7a90d3 934 */
f06e48d8 935 private pushWorkerNode (worker: Worker): number {
9c16fb4b 936 this.workerNodes.push({
ffcbbad8 937 worker,
7ae6fb74 938 info: { id: this.getWorkerId(worker), started: true },
f59e1027 939 usage: this.getWorkerUsage(),
29ee7e9a 940 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 941 })
9c16fb4b
JB
942 const workerNodeKey = this.getWorkerNodeKey(worker)
943 this.setWorkerNodeTasksUsage(
944 this.workerNodes[workerNodeKey],
945 this.getWorkerUsage(workerNodeKey)
946 )
947 return this.workerNodes.length
ea7a90d3 948 }
c923ce56 949
83fa0a36
JB
950 /**
951 * Gets the worker id.
952 *
953 * @param worker - The worker.
954 * @returns The worker id.
955 */
956 private getWorkerId (worker: Worker): number | undefined {
957 if (this.worker === WorkerTypes.thread) {
958 return worker.threadId
959 } else if (this.worker === WorkerTypes.cluster) {
960 return worker.id
961 }
962 }
963
8604aaab
JB
964 // /**
965 // * Sets the given worker in the pool worker nodes.
966 // *
967 // * @param workerNodeKey - The worker node key.
968 // * @param worker - The worker.
f59e1027 969 // * @param workerInfo - The worker info.
8604aaab
JB
970 // * @param workerUsage - The worker usage.
971 // * @param tasksQueue - The worker task queue.
972 // */
973 // private setWorkerNode (
974 // workerNodeKey: number,
975 // worker: Worker,
f59e1027 976 // workerInfo: WorkerInfo,
8604aaab
JB
977 // workerUsage: WorkerUsage,
978 // tasksQueue: Queue<Task<Data>>
979 // ): void {
980 // this.workerNodes[workerNodeKey] = {
981 // worker,
f59e1027
JB
982 // info: workerInfo,
983 // usage: workerUsage,
8604aaab
JB
984 // tasksQueue
985 // }
986 // }
51fe3d3c
JB
987
988 /**
f06e48d8 989 * Removes the given worker from the pool worker nodes.
51fe3d3c 990 *
f06e48d8 991 * @param worker - The worker.
51fe3d3c 992 */
416fd65c 993 private removeWorkerNode (worker: Worker): void {
f06e48d8 994 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
995 if (workerNodeKey !== -1) {
996 this.workerNodes.splice(workerNodeKey, 1)
997 this.workerChoiceStrategyContext.remove(workerNodeKey)
998 }
51fe3d3c 999 }
adc3c320 1000
2e81254d 1001 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1002 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1003 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1004 }
1005
f9f00b5f 1006 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1007 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1008 }
1009
416fd65c 1010 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1011 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1012 }
1013
416fd65c 1014 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1015 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1016 }
ff733df7 1017
df593701
JB
1018 private tasksMaxQueueSize (workerNodeKey: number): number {
1019 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1020 }
1021
416fd65c 1022 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1023 while (this.tasksQueueSize(workerNodeKey) > 0) {
1024 this.executeTask(
1025 workerNodeKey,
1026 this.dequeueTask(workerNodeKey) as Task<Data>
1027 )
ff733df7 1028 }
df593701 1029 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1030 }
1031
ef41a6e6
JB
1032 private flushTasksQueues (): void {
1033 for (const [workerNodeKey] of this.workerNodes.entries()) {
1034 this.flushTasksQueue(workerNodeKey)
1035 }
1036 }
b6b32453
JB
1037
1038 private setWorkerStatistics (worker: Worker): void {
1039 this.sendToWorker(worker, {
1040 statistics: {
87de9ff5
JB
1041 runTime:
1042 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1043 .runTime.aggregate,
87de9ff5 1044 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1045 .elu.aggregate
b6b32453
JB
1046 }
1047 })
1048 }
8604aaab 1049
9c16fb4b 1050 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1051 const getTasksQueueSize = (workerNodeKey?: number): number => {
1052 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1053 }
df593701
JB
1054 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1055 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1056 }
8604aaab 1057 return {
9c16fb4b
JB
1058 tasks: {
1059 executed: 0,
1060 executing: 0,
1061 get queued (): number {
e3347a5c 1062 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1063 },
df593701
JB
1064 get maxQueued (): number {
1065 return getTasksMaxQueueSize(workerNodeKey)
1066 },
9c16fb4b
JB
1067 failed: 0
1068 },
8604aaab 1069 runTime: {
932fc8be 1070 aggregate: 0,
8604aaab
JB
1071 average: 0,
1072 median: 0,
1073 history: new CircularArray()
1074 },
1075 waitTime: {
932fc8be 1076 aggregate: 0,
8604aaab
JB
1077 average: 0,
1078 median: 0,
1079 history: new CircularArray()
1080 },
5df69fab
JB
1081 elu: {
1082 idle: {
1083 aggregate: 0,
1084 average: 0,
1085 median: 0,
1086 history: new CircularArray()
1087 },
1088 active: {
1089 aggregate: 0,
1090 average: 0,
1091 median: 0,
1092 history: new CircularArray()
1093 },
1094 utilization: 0
1095 }
8604aaab
JB
1096 }
1097 }
c97c7edb 1098}