Merge branch 'master' of github.com:jerome-benoit/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
a35560ba 34import {
f0d7f803 35 Measurements,
a35560ba 36 WorkerChoiceStrategies,
a20f0ba5
JB
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
bdaf31cd
JB
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
f06e48d8 55 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
156 throw new Error('Cannot instantiate a fixed pool with no worker')
157 }
158 }
159
7c0ba920 160 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
161 if (isPlainObject(opts)) {
162 this.opts.workerChoiceStrategy =
163 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
164 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
165 this.opts.workerChoiceStrategyOptions =
166 opts.workerChoiceStrategyOptions ??
167 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
168 this.checkValidWorkerChoiceStrategyOptions(
169 this.opts.workerChoiceStrategyOptions
170 )
1f68cede 171 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
172 this.opts.enableEvents = opts.enableEvents ?? true
173 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
174 if (this.opts.enableTasksQueue) {
175 this.checkValidTasksQueueOptions(
176 opts.tasksQueueOptions as TasksQueueOptions
177 )
178 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
179 opts.tasksQueueOptions as TasksQueueOptions
180 )
181 }
182 } else {
183 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 184 }
aee46736
JB
185 }
186
187 private checkValidWorkerChoiceStrategy (
188 workerChoiceStrategy: WorkerChoiceStrategy
189 ): void {
190 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 191 throw new Error(
aee46736 192 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
193 )
194 }
7c0ba920
JB
195 }
196
0d80593b
JB
197 private checkValidWorkerChoiceStrategyOptions (
198 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
199 ): void {
200 if (!isPlainObject(workerChoiceStrategyOptions)) {
201 throw new TypeError(
202 'Invalid worker choice strategy options: must be a plain object'
203 )
204 }
49be33fe
JB
205 if (
206 workerChoiceStrategyOptions.weights != null &&
6b27d407 207 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
208 ) {
209 throw new Error(
210 'Invalid worker choice strategy options: must have a weight for each worker node'
211 )
212 }
f0d7f803
JB
213 if (
214 workerChoiceStrategyOptions.measurement != null &&
215 !Object.values(Measurements).includes(
216 workerChoiceStrategyOptions.measurement
217 )
218 ) {
219 throw new Error(
220 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
221 )
222 }
0d80593b
JB
223 }
224
a20f0ba5
JB
225 private checkValidTasksQueueOptions (
226 tasksQueueOptions: TasksQueueOptions
227 ): void {
0d80593b
JB
228 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
229 throw new TypeError('Invalid tasks queue options: must be a plain object')
230 }
f0d7f803
JB
231 if (
232 tasksQueueOptions?.concurrency != null &&
233 !Number.isSafeInteger(tasksQueueOptions.concurrency)
234 ) {
235 throw new TypeError(
236 'Invalid worker tasks concurrency: must be an integer'
237 )
238 }
239 if (
240 tasksQueueOptions?.concurrency != null &&
241 tasksQueueOptions.concurrency <= 0
242 ) {
a20f0ba5 243 throw new Error(
f0d7f803 244 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
245 )
246 }
247 }
248
08f3f44c 249 /** @inheritDoc */
6b27d407
JB
250 public get info (): PoolInfo {
251 return {
252 type: this.type,
184855e6 253 worker: this.worker,
6b27d407
JB
254 minSize: this.minSize,
255 maxSize: this.maxSize,
c05f0d50
JB
256 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
257 .runTime.aggregate &&
1305e9a8
JB
258 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
260 workerNodes: this.workerNodes.length,
261 idleWorkerNodes: this.workerNodes.reduce(
262 (accumulator, workerNode) =>
f59e1027 263 workerNode.usage.tasks.executing === 0
a4e07f72
JB
264 ? accumulator + 1
265 : accumulator,
6b27d407
JB
266 0
267 ),
268 busyWorkerNodes: this.workerNodes.reduce(
269 (accumulator, workerNode) =>
f59e1027 270 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
271 0
272 ),
a4e07f72 273 executedTasks: this.workerNodes.reduce(
6b27d407 274 (accumulator, workerNode) =>
f59e1027 275 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
276 0
277 ),
278 executingTasks: this.workerNodes.reduce(
279 (accumulator, workerNode) =>
f59e1027 280 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
281 0
282 ),
283 queuedTasks: this.workerNodes.reduce(
df593701 284 (accumulator, workerNode) =>
f59e1027 285 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
286 0
287 ),
288 maxQueuedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
f59e1027 290 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 291 0
a4e07f72
JB
292 ),
293 failedTasks: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 accumulator + workerNode.usage.tasks.failed,
a4e07f72 296 0
6b27d407
JB
297 )
298 }
299 }
08f3f44c 300
afe0d5bf
JB
301 /**
302 * Gets the pool run time.
303 *
304 * @returns The pool run time in milliseconds.
305 */
306 private get runTime (): number {
307 return performance.now() - this.startTimestamp
308 }
309
310 /**
311 * Gets the approximate pool utilization.
312 *
313 * @returns The pool utilization.
314 */
315 private get utilization (): number {
316 const poolRunTimeCapacity = this.runTime * this.maxSize
317 const totalTasksRunTime = this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.runTime.aggregate,
320 0
321 )
322 const totalTasksWaitTime = this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 accumulator + workerNode.usage.waitTime.aggregate,
325 0
326 )
327 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
328 }
329
8881ae32
JB
330 /**
331 * Pool type.
332 *
333 * If it is `'dynamic'`, it provides the `max` property.
334 */
335 protected abstract get type (): PoolType
336
184855e6
JB
337 /**
338 * Gets the worker type.
339 */
340 protected abstract get worker (): WorkerType
341
c2ade475 342 /**
6b27d407 343 * Pool minimum size.
c2ade475 344 */
6b27d407 345 protected abstract get minSize (): number
ff733df7
JB
346
347 /**
6b27d407 348 * Pool maximum size.
ff733df7 349 */
6b27d407 350 protected abstract get maxSize (): number
a35560ba 351
f59e1027
JB
352 /**
353 * Get the worker given its id.
354 *
355 * @param workerId - The worker id.
356 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
357 */
358 private getWorkerById (workerId: number): Worker | undefined {
359 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
360 ?.worker
361 }
362
ffcbbad8 363 /**
f06e48d8 364 * Gets the given worker its worker node key.
ffcbbad8
JB
365 *
366 * @param worker - The worker.
f59e1027 367 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 368 */
f06e48d8
JB
369 private getWorkerNodeKey (worker: Worker): number {
370 return this.workerNodes.findIndex(
371 workerNode => workerNode.worker === worker
372 )
bf9549ae
JB
373 }
374
afc003b2 375 /** @inheritDoc */
a35560ba 376 public setWorkerChoiceStrategy (
59219cbb
JB
377 workerChoiceStrategy: WorkerChoiceStrategy,
378 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 379 ): void {
aee46736 380 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 381 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
382 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
383 this.opts.workerChoiceStrategy
384 )
385 if (workerChoiceStrategyOptions != null) {
386 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
387 }
9c16fb4b 388 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
389 this.setWorkerNodeTasksUsage(
390 workerNode,
9c16fb4b 391 this.getWorkerUsage(workerNodeKey)
8604aaab 392 )
b6b32453 393 this.setWorkerStatistics(workerNode.worker)
59219cbb 394 }
a20f0ba5
JB
395 }
396
397 /** @inheritDoc */
398 public setWorkerChoiceStrategyOptions (
399 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
400 ): void {
0d80593b 401 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
402 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
403 this.workerChoiceStrategyContext.setOptions(
404 this.opts.workerChoiceStrategyOptions
a35560ba
S
405 )
406 }
407
a20f0ba5 408 /** @inheritDoc */
8f52842f
JB
409 public enableTasksQueue (
410 enable: boolean,
411 tasksQueueOptions?: TasksQueueOptions
412 ): void {
a20f0ba5 413 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 414 this.flushTasksQueues()
a20f0ba5
JB
415 }
416 this.opts.enableTasksQueue = enable
8f52842f 417 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
418 }
419
420 /** @inheritDoc */
8f52842f 421 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 422 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
423 this.checkValidTasksQueueOptions(tasksQueueOptions)
424 this.opts.tasksQueueOptions =
425 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 426 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
427 delete this.opts.tasksQueueOptions
428 }
429 }
430
431 private buildTasksQueueOptions (
432 tasksQueueOptions: TasksQueueOptions
433 ): TasksQueueOptions {
434 return {
435 concurrency: tasksQueueOptions?.concurrency ?? 1
436 }
437 }
438
c319c66b
JB
439 /**
440 * Whether the pool is full or not.
441 *
442 * The pool filling boolean status.
443 */
dea903a8
JB
444 protected get full (): boolean {
445 return this.workerNodes.length >= this.maxSize
446 }
c2ade475 447
c319c66b
JB
448 /**
449 * Whether the pool is busy or not.
450 *
451 * The pool busyness boolean status.
452 */
453 protected abstract get busy (): boolean
7c0ba920 454
6c6afb84
JB
455 /**
456 * Whether worker nodes are executing at least one task.
457 *
458 * @returns Worker nodes busyness boolean status.
459 */
c2ade475 460 protected internalBusy (): boolean {
e0ae6100
JB
461 return (
462 this.workerNodes.findIndex(workerNode => {
f59e1027 463 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
464 }) === -1
465 )
cb70b19d
JB
466 }
467
afc003b2 468 /** @inheritDoc */
a86b6df1 469 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 470 const timestamp = performance.now()
20dcad1a 471 const workerNodeKey = this.chooseWorkerNode()
adc3c320 472 const submittedTask: Task<Data> = {
a86b6df1 473 name,
e5a5c0fc
JB
474 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
475 data: data ?? ({} as Data),
b6b32453 476 timestamp,
adc3c320
JB
477 id: crypto.randomUUID()
478 }
2e81254d 479 const res = new Promise<Response>((resolve, reject) => {
02706357 480 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
481 resolve,
482 reject,
20dcad1a 483 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
484 })
485 })
ff733df7
JB
486 if (
487 this.opts.enableTasksQueue === true &&
7171d33f 488 (this.busy ||
f59e1027 489 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 490 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 491 .concurrency as number))
ff733df7 492 ) {
26a929d7
JB
493 this.enqueueTask(workerNodeKey, submittedTask)
494 } else {
2e81254d 495 this.executeTask(workerNodeKey, submittedTask)
adc3c320 496 }
ff733df7 497 this.checkAndEmitEvents()
78cea37e 498 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
499 return res
500 }
c97c7edb 501
afc003b2 502 /** @inheritDoc */
c97c7edb 503 public async destroy (): Promise<void> {
1fbcaa7c 504 await Promise.all(
875a7c37
JB
505 this.workerNodes.map(async (workerNode, workerNodeKey) => {
506 this.flushTasksQueue(workerNodeKey)
47aacbaa 507 // FIXME: wait for tasks to be finished
f06e48d8 508 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
509 })
510 )
c97c7edb
S
511 }
512
4a6952ff 513 /**
6c6afb84 514 * Terminates the given worker.
4a6952ff 515 *
f06e48d8 516 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
517 */
518 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 519
729c563d 520 /**
6677a3d3
JB
521 * Setup hook to execute code before worker nodes are created in the abstract constructor.
522 * Can be overridden.
afc003b2
JB
523 *
524 * @virtual
729c563d 525 */
280c2a77 526 protected setupHook (): void {
d99ba5a8 527 // Intentionally empty
280c2a77 528 }
c97c7edb 529
729c563d 530 /**
280c2a77
S
531 * Should return whether the worker is the main worker or not.
532 */
533 protected abstract isMain (): boolean
534
535 /**
2e81254d 536 * Hook executed before the worker task execution.
bf9549ae 537 * Can be overridden.
729c563d 538 *
f06e48d8 539 * @param workerNodeKey - The worker node key.
1c6fe997 540 * @param task - The task to execute.
729c563d 541 */
1c6fe997
JB
542 protected beforeTaskExecutionHook (
543 workerNodeKey: number,
544 task: Task<Data>
545 ): void {
f59e1027 546 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
547 ++workerUsage.tasks.executing
548 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
549 }
550
c01733f1 551 /**
2e81254d 552 * Hook executed after the worker task execution.
bf9549ae 553 * Can be overridden.
c01733f1 554 *
c923ce56 555 * @param worker - The worker.
38e795c1 556 * @param message - The received message.
c01733f1 557 */
2e81254d 558 protected afterTaskExecutionHook (
c923ce56 559 worker: Worker,
2740a743 560 message: MessageValue<Response>
bf9549ae 561 ): void {
f59e1027 562 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
563 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
564 this.updateRunTimeWorkerUsage(workerUsage, message)
565 this.updateEluWorkerUsage(workerUsage, message)
566 }
567
568 private updateTaskStatisticsWorkerUsage (
569 workerUsage: WorkerUsage,
570 message: MessageValue<Response>
571 ): void {
a4e07f72
JB
572 const workerTaskStatistics = workerUsage.tasks
573 --workerTaskStatistics.executing
574 ++workerTaskStatistics.executed
82f36766 575 if (message.taskError != null) {
a4e07f72 576 ++workerTaskStatistics.failed
2740a743 577 }
f8eb0a2a
JB
578 }
579
a4e07f72
JB
580 private updateRunTimeWorkerUsage (
581 workerUsage: WorkerUsage,
f8eb0a2a
JB
582 message: MessageValue<Response>
583 ): void {
87de9ff5
JB
584 if (
585 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 586 .aggregate
87de9ff5 587 ) {
932fc8be 588 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 589 if (
932fc8be
JB
590 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
591 .average &&
a4e07f72 592 workerUsage.tasks.executed !== 0
c6bd2650 593 ) {
a4e07f72 594 workerUsage.runTime.average =
f1c06930
JB
595 workerUsage.runTime.aggregate /
596 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 597 }
3fa4cdd2 598 if (
932fc8be
JB
599 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
600 .median &&
d715b7bc 601 message.taskPerformance?.runTime != null
3fa4cdd2 602 ) {
a4e07f72
JB
603 workerUsage.runTime.history.push(message.taskPerformance.runTime)
604 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 605 }
3032893a 606 }
f8eb0a2a
JB
607 }
608
a4e07f72
JB
609 private updateWaitTimeWorkerUsage (
610 workerUsage: WorkerUsage,
1c6fe997 611 task: Task<Data>
f8eb0a2a 612 ): void {
1c6fe997
JB
613 const timestamp = performance.now()
614 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 617 .aggregate
87de9ff5 618 ) {
932fc8be 619 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 620 if (
87de9ff5 621 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 622 .waitTime.average &&
a4e07f72 623 workerUsage.tasks.executed !== 0
09a6305f 624 ) {
a4e07f72 625 workerUsage.waitTime.average =
f1c06930
JB
626 workerUsage.waitTime.aggregate /
627 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
628 }
629 if (
87de9ff5 630 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 631 .waitTime.median &&
1c6fe997 632 taskWaitTime != null
09a6305f 633 ) {
1c6fe997 634 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 635 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 636 }
0567595a 637 }
c01733f1 638 }
639
a4e07f72 640 private updateEluWorkerUsage (
5df69fab 641 workerUsage: WorkerUsage,
62c15a68
JB
642 message: MessageValue<Response>
643 ): void {
5df69fab
JB
644 if (
645 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
646 .aggregate
647 ) {
648 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
649 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
650 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
651 workerUsage.elu.utilization =
652 (workerUsage.elu.utilization +
653 message.taskPerformance.elu.utilization) /
654 2
655 } else if (message.taskPerformance?.elu != null) {
656 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
657 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
658 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
659 }
d715b7bc 660 if (
5df69fab
JB
661 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
662 .average &&
663 workerUsage.tasks.executed !== 0
664 ) {
f1c06930
JB
665 const executedTasks =
666 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 667 workerUsage.elu.idle.average =
f1c06930 668 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 669 workerUsage.elu.active.average =
f1c06930 670 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
671 }
672 if (
673 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
674 .median &&
d715b7bc
JB
675 message.taskPerformance?.elu != null
676 ) {
5df69fab
JB
677 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
678 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
679 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
680 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
681 }
682 }
683 }
684
280c2a77 685 /**
f06e48d8 686 * Chooses a worker node for the next task.
280c2a77 687 *
6c6afb84 688 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 689 *
20dcad1a 690 * @returns The worker node key
280c2a77 691 */
6c6afb84 692 private chooseWorkerNode (): number {
930dcf12 693 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
694 const worker = this.createAndSetupDynamicWorker()
695 if (
696 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
697 ) {
698 return this.getWorkerNodeKey(worker)
699 }
17393ac8 700 }
930dcf12
JB
701 return this.workerChoiceStrategyContext.execute()
702 }
703
6c6afb84
JB
704 /**
705 * Conditions for dynamic worker creation.
706 *
707 * @returns Whether to create a dynamic worker or not.
708 */
709 private shallCreateDynamicWorker (): boolean {
930dcf12 710 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
711 }
712
280c2a77 713 /**
675bb809 714 * Sends a message to the given worker.
280c2a77 715 *
38e795c1
JB
716 * @param worker - The worker which should receive the message.
717 * @param message - The message.
280c2a77
S
718 */
719 protected abstract sendToWorker (
720 worker: Worker,
721 message: MessageValue<Data>
722 ): void
723
4a6952ff 724 /**
f06e48d8 725 * Registers a listener callback on the given worker.
4a6952ff 726 *
38e795c1
JB
727 * @param worker - The worker which should register a listener.
728 * @param listener - The message listener callback.
4a6952ff 729 */
e102732c
JB
730 private registerWorkerMessageListener<Message extends Data | Response>(
731 worker: Worker,
732 listener: (message: MessageValue<Message>) => void
733 ): void {
734 worker.on('message', listener as MessageHandler<Worker>)
735 }
c97c7edb 736
729c563d 737 /**
41344292 738 * Creates a new worker.
6c6afb84
JB
739 *
740 * @returns Newly created worker.
729c563d 741 */
280c2a77 742 protected abstract createWorker (): Worker
c97c7edb 743
729c563d 744 /**
f06e48d8 745 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 746 * Can be overridden.
729c563d 747 *
38e795c1 748 * @param worker - The newly created worker.
729c563d 749 */
6677a3d3 750 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
751 // Listen to worker messages.
752 this.registerWorkerMessageListener(worker, this.workerListener())
753 }
c97c7edb 754
4a6952ff 755 /**
f06e48d8 756 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
757 *
758 * @returns New, completely set up worker.
759 */
760 protected createAndSetupWorker (): Worker {
bdacc2d2 761 const worker = this.createWorker()
280c2a77 762
35cf1c03 763 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 764 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
765 worker.on('error', error => {
766 if (this.emitter != null) {
767 this.emitter.emit(PoolEvents.error, error)
768 }
e8a4c3ea 769 if (this.opts.restartWorkerOnError === true) {
1f68cede 770 this.createAndSetupWorker()
5baee0d7
JB
771 }
772 })
a35560ba
S
773 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
774 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 775 worker.once('exit', () => {
f06e48d8 776 this.removeWorkerNode(worker)
a974afa6 777 })
280c2a77 778
f06e48d8 779 this.pushWorkerNode(worker)
280c2a77 780
b6b32453
JB
781 this.setWorkerStatistics(worker)
782
280c2a77
S
783 this.afterWorkerSetup(worker)
784
c97c7edb
S
785 return worker
786 }
be0676b3 787
930dcf12
JB
788 /**
789 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
790 *
791 * @returns New, completely set up dynamic worker.
792 */
793 protected createAndSetupDynamicWorker (): Worker {
794 const worker = this.createAndSetupWorker()
795 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 796 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
797 if (
798 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
799 (message.kill != null &&
800 ((this.opts.enableTasksQueue === false &&
f59e1027 801 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 802 (this.opts.enableTasksQueue === true &&
f59e1027 803 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 804 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
805 ) {
806 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
807 void (this.destroyWorker(worker) as Promise<void>)
808 }
809 })
810 return worker
811 }
812
be0676b3 813 /**
ff733df7 814 * This function is the listener registered for each worker message.
be0676b3 815 *
bdacc2d2 816 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
817 */
818 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 819 return message => {
f59e1027
JB
820 if (message.workerId != null && message.started != null) {
821 // Worker started message received
6b272951 822 this.handleWorkerStartedMessage(message)
f59e1027 823 } else if (message.id != null) {
a3445496 824 // Task execution response received
6b272951
JB
825 this.handleTaskExecutionResponse(message)
826 }
827 }
828 }
829
830 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
831 // Worker started message received
832 const worker = this.getWorkerById(message.workerId as number)
833 if (worker != null) {
834 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
835 message.started as boolean
836 } else {
837 throw new Error(
838 `Worker started message received from unknown worker '${
839 message.workerId as number
840 }'`
841 )
842 }
843 }
844
845 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
846 const promiseResponse = this.promiseResponseMap.get(message.id as string)
847 if (promiseResponse != null) {
848 if (message.taskError != null) {
849 if (this.emitter != null) {
850 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 851 }
6b272951
JB
852 promiseResponse.reject(message.taskError.message)
853 } else {
854 promiseResponse.resolve(message.data as Response)
855 }
856 this.afterTaskExecutionHook(promiseResponse.worker, message)
857 this.promiseResponseMap.delete(message.id as string)
858 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
859 if (
860 this.opts.enableTasksQueue === true &&
861 this.tasksQueueSize(workerNodeKey) > 0
862 ) {
863 this.executeTask(
864 workerNodeKey,
865 this.dequeueTask(workerNodeKey) as Task<Data>
866 )
be0676b3 867 }
6b272951 868 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 869 }
be0676b3 870 }
7c0ba920 871
ff733df7 872 private checkAndEmitEvents (): void {
1f68cede 873 if (this.emitter != null) {
ff733df7 874 if (this.busy) {
6b27d407 875 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 876 }
6b27d407
JB
877 if (this.type === PoolTypes.dynamic && this.full) {
878 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 879 }
164d950a
JB
880 }
881 }
882
0ebe2a9f
JB
883 /**
884 * Sets the given worker node its tasks usage in the pool.
885 *
886 * @param workerNode - The worker node.
a4e07f72 887 * @param workerUsage - The worker usage.
0ebe2a9f
JB
888 */
889 private setWorkerNodeTasksUsage (
890 workerNode: WorkerNode<Worker, Data>,
a4e07f72 891 workerUsage: WorkerUsage
0ebe2a9f 892 ): void {
f59e1027 893 workerNode.usage = workerUsage
0ebe2a9f
JB
894 }
895
a05c10de 896 /**
f06e48d8 897 * Pushes the given worker in the pool worker nodes.
ea7a90d3 898 *
38e795c1 899 * @param worker - The worker.
f06e48d8 900 * @returns The worker nodes length.
ea7a90d3 901 */
f06e48d8 902 private pushWorkerNode (worker: Worker): number {
9c16fb4b 903 this.workerNodes.push({
ffcbbad8 904 worker,
7ae6fb74 905 info: { id: this.getWorkerId(worker), started: true },
f59e1027 906 usage: this.getWorkerUsage(),
29ee7e9a 907 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 908 })
9c16fb4b
JB
909 const workerNodeKey = this.getWorkerNodeKey(worker)
910 this.setWorkerNodeTasksUsage(
911 this.workerNodes[workerNodeKey],
912 this.getWorkerUsage(workerNodeKey)
913 )
914 return this.workerNodes.length
ea7a90d3 915 }
c923ce56 916
83fa0a36
JB
917 /**
918 * Gets the worker id.
919 *
920 * @param worker - The worker.
921 * @returns The worker id.
922 */
923 private getWorkerId (worker: Worker): number | undefined {
924 if (this.worker === WorkerTypes.thread) {
925 return worker.threadId
926 } else if (this.worker === WorkerTypes.cluster) {
927 return worker.id
928 }
929 }
930
8604aaab
JB
931 // /**
932 // * Sets the given worker in the pool worker nodes.
933 // *
934 // * @param workerNodeKey - The worker node key.
935 // * @param worker - The worker.
f59e1027 936 // * @param workerInfo - The worker info.
8604aaab
JB
937 // * @param workerUsage - The worker usage.
938 // * @param tasksQueue - The worker task queue.
939 // */
940 // private setWorkerNode (
941 // workerNodeKey: number,
942 // worker: Worker,
f59e1027 943 // workerInfo: WorkerInfo,
8604aaab
JB
944 // workerUsage: WorkerUsage,
945 // tasksQueue: Queue<Task<Data>>
946 // ): void {
947 // this.workerNodes[workerNodeKey] = {
948 // worker,
f59e1027
JB
949 // info: workerInfo,
950 // usage: workerUsage,
8604aaab
JB
951 // tasksQueue
952 // }
953 // }
51fe3d3c
JB
954
955 /**
f06e48d8 956 * Removes the given worker from the pool worker nodes.
51fe3d3c 957 *
f06e48d8 958 * @param worker - The worker.
51fe3d3c 959 */
416fd65c 960 private removeWorkerNode (worker: Worker): void {
f06e48d8 961 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
962 if (workerNodeKey !== -1) {
963 this.workerNodes.splice(workerNodeKey, 1)
964 this.workerChoiceStrategyContext.remove(workerNodeKey)
965 }
51fe3d3c 966 }
adc3c320 967
2e81254d 968 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 969 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
970 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
971 }
972
f9f00b5f 973 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 974 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
975 }
976
416fd65c 977 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 978 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
979 }
980
416fd65c 981 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 982 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 983 }
ff733df7 984
df593701
JB
985 private tasksMaxQueueSize (workerNodeKey: number): number {
986 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
987 }
988
416fd65c
JB
989 private flushTasksQueue (workerNodeKey: number): void {
990 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
991 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
992 this.executeTask(
993 workerNodeKey,
994 this.dequeueTask(workerNodeKey) as Task<Data>
995 )
ff733df7 996 }
ff733df7 997 }
df593701 998 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
999 }
1000
ef41a6e6
JB
1001 private flushTasksQueues (): void {
1002 for (const [workerNodeKey] of this.workerNodes.entries()) {
1003 this.flushTasksQueue(workerNodeKey)
1004 }
1005 }
b6b32453
JB
1006
1007 private setWorkerStatistics (worker: Worker): void {
1008 this.sendToWorker(worker, {
1009 statistics: {
87de9ff5
JB
1010 runTime:
1011 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1012 .runTime.aggregate,
87de9ff5 1013 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1014 .elu.aggregate
b6b32453
JB
1015 }
1016 })
1017 }
8604aaab 1018
9c16fb4b 1019 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1020 const getTasksQueueSize = (workerNodeKey?: number): number => {
1021 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1022 }
df593701
JB
1023 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1024 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1025 }
8604aaab 1026 return {
9c16fb4b
JB
1027 tasks: {
1028 executed: 0,
1029 executing: 0,
1030 get queued (): number {
e3347a5c 1031 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1032 },
df593701
JB
1033 get maxQueued (): number {
1034 return getTasksMaxQueueSize(workerNodeKey)
1035 },
9c16fb4b
JB
1036 failed: 0
1037 },
8604aaab 1038 runTime: {
932fc8be 1039 aggregate: 0,
8604aaab
JB
1040 average: 0,
1041 median: 0,
1042 history: new CircularArray()
1043 },
1044 waitTime: {
932fc8be 1045 aggregate: 0,
8604aaab
JB
1046 average: 0,
1047 median: 0,
1048 history: new CircularArray()
1049 },
5df69fab
JB
1050 elu: {
1051 idle: {
1052 aggregate: 0,
1053 average: 0,
1054 median: 0,
1055 history: new CircularArray()
1056 },
1057 active: {
1058 aggregate: 0,
1059 average: 0,
1060 median: 0,
1061 history: new CircularArray()
1062 },
1063 utilization: 0
1064 }
8604aaab
JB
1065 }
1066 }
c97c7edb 1067}