fix: ensure worker started message cannot be received for non existing
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6 21 type TasksQueueOptions,
83fa0a36
JB
22 type WorkerType,
23 WorkerTypes
c4855468 24} from './pool'
e102732c
JB
25import type {
26 IWorker,
27 MessageHandler,
28 Task,
29 WorkerNode,
30 WorkerUsage
31} from './worker'
a35560ba 32import {
f0d7f803 33 Measurements,
a35560ba 34 WorkerChoiceStrategies,
a20f0ba5
JB
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
bdaf31cd
JB
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 39
729c563d 40/**
ea7a90d3 41 * Base class that implements some shared logic for all poolifier pools.
729c563d 42 *
38e795c1 43 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
44 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
45 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 46 */
c97c7edb 47export abstract class AbstractPool<
f06e48d8 48 Worker extends IWorker,
d3c8a1a8
S
49 Data = unknown,
50 Response = unknown
c4855468 51> implements IPool<Worker, Data, Response> {
afc003b2 52 /** @inheritDoc */
f06e48d8 53 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 54
afc003b2 55 /** @inheritDoc */
7c0ba920
JB
56 public readonly emitter?: PoolEmitter
57
be0676b3 58 /**
a3445496 59 * The execution response promise map.
be0676b3 60 *
2740a743 61 * - `key`: The message id of each submitted task.
a3445496 62 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 63 *
a3445496 64 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 65 */
c923ce56
JB
66 protected promiseResponseMap: Map<
67 string,
68 PromiseResponseWrapper<Worker, Response>
69 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 70
a35560ba 71 /**
51fe3d3c 72 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
73 */
74 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
75 Worker,
76 Data,
77 Response
a35560ba
S
78 >
79
729c563d
S
80 /**
81 * Constructs a new poolifier pool.
82 *
38e795c1 83 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 84 * @param filePath - Path to the worker file.
38e795c1 85 * @param opts - Options for the pool.
729c563d 86 */
c97c7edb 87 public constructor (
b4213b7f
JB
88 protected readonly numberOfWorkers: number,
89 protected readonly filePath: string,
90 protected readonly opts: PoolOptions<Worker>
c97c7edb 91 ) {
78cea37e 92 if (!this.isMain()) {
c97c7edb
S
93 throw new Error('Cannot start a pool from a worker!')
94 }
8d3782fa 95 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 96 this.checkFilePath(this.filePath)
7c0ba920 97 this.checkPoolOptions(this.opts)
1086026a 98
7254e419
JB
99 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
100 this.executeTask = this.executeTask.bind(this)
101 this.enqueueTask = this.enqueueTask.bind(this)
102 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 103
6bd72cd0 104 if (this.opts.enableEvents === true) {
7c0ba920
JB
105 this.emitter = new PoolEmitter()
106 }
d59df138
JB
107 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
108 Worker,
109 Data,
110 Response
da309861
JB
111 >(
112 this,
113 this.opts.workerChoiceStrategy,
114 this.opts.workerChoiceStrategyOptions
115 )
b6b32453
JB
116
117 this.setupHook()
118
119 for (let i = 1; i <= this.numberOfWorkers; i++) {
120 this.createAndSetupWorker()
121 }
c97c7edb
S
122 }
123
a35560ba 124 private checkFilePath (filePath: string): void {
ffcbbad8
JB
125 if (
126 filePath == null ||
127 (typeof filePath === 'string' && filePath.trim().length === 0)
128 ) {
c510fea7
APA
129 throw new Error('Please specify a file with a worker implementation')
130 }
131 }
132
8d3782fa
JB
133 private checkNumberOfWorkers (numberOfWorkers: number): void {
134 if (numberOfWorkers == null) {
135 throw new Error(
136 'Cannot instantiate a pool without specifying the number of workers'
137 )
78cea37e 138 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 139 throw new TypeError(
0d80593b 140 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
141 )
142 } else if (numberOfWorkers < 0) {
473c717a 143 throw new RangeError(
8d3782fa
JB
144 'Cannot instantiate a pool with a negative number of workers'
145 )
6b27d407 146 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
147 throw new Error('Cannot instantiate a fixed pool with no worker')
148 }
149 }
150
7c0ba920 151 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
152 if (isPlainObject(opts)) {
153 this.opts.workerChoiceStrategy =
154 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
155 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
156 this.opts.workerChoiceStrategyOptions =
157 opts.workerChoiceStrategyOptions ??
158 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
159 this.checkValidWorkerChoiceStrategyOptions(
160 this.opts.workerChoiceStrategyOptions
161 )
1f68cede 162 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
163 this.opts.enableEvents = opts.enableEvents ?? true
164 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
165 if (this.opts.enableTasksQueue) {
166 this.checkValidTasksQueueOptions(
167 opts.tasksQueueOptions as TasksQueueOptions
168 )
169 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
170 opts.tasksQueueOptions as TasksQueueOptions
171 )
172 }
173 } else {
174 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 175 }
aee46736
JB
176 }
177
178 private checkValidWorkerChoiceStrategy (
179 workerChoiceStrategy: WorkerChoiceStrategy
180 ): void {
181 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 182 throw new Error(
aee46736 183 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
184 )
185 }
7c0ba920
JB
186 }
187
0d80593b
JB
188 private checkValidWorkerChoiceStrategyOptions (
189 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
190 ): void {
191 if (!isPlainObject(workerChoiceStrategyOptions)) {
192 throw new TypeError(
193 'Invalid worker choice strategy options: must be a plain object'
194 )
195 }
49be33fe
JB
196 if (
197 workerChoiceStrategyOptions.weights != null &&
6b27d407 198 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
199 ) {
200 throw new Error(
201 'Invalid worker choice strategy options: must have a weight for each worker node'
202 )
203 }
f0d7f803
JB
204 if (
205 workerChoiceStrategyOptions.measurement != null &&
206 !Object.values(Measurements).includes(
207 workerChoiceStrategyOptions.measurement
208 )
209 ) {
210 throw new Error(
211 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
212 )
213 }
0d80593b
JB
214 }
215
a20f0ba5
JB
216 private checkValidTasksQueueOptions (
217 tasksQueueOptions: TasksQueueOptions
218 ): void {
0d80593b
JB
219 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
220 throw new TypeError('Invalid tasks queue options: must be a plain object')
221 }
f0d7f803
JB
222 if (
223 tasksQueueOptions?.concurrency != null &&
224 !Number.isSafeInteger(tasksQueueOptions.concurrency)
225 ) {
226 throw new TypeError(
227 'Invalid worker tasks concurrency: must be an integer'
228 )
229 }
230 if (
231 tasksQueueOptions?.concurrency != null &&
232 tasksQueueOptions.concurrency <= 0
233 ) {
a20f0ba5 234 throw new Error(
f0d7f803 235 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
236 )
237 }
238 }
239
f59e1027
JB
240 private get starting (): boolean {
241 return this.workerNodes.some(workerNode => !workerNode.info.started)
242 }
243
244 private get started (): boolean {
245 return this.workerNodes.some(workerNode => workerNode.info.started)
246 }
247
08f3f44c 248 /** @inheritDoc */
6b27d407
JB
249 public get info (): PoolInfo {
250 return {
251 type: this.type,
184855e6 252 worker: this.worker,
6b27d407
JB
253 minSize: this.minSize,
254 maxSize: this.maxSize,
255 workerNodes: this.workerNodes.length,
256 idleWorkerNodes: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
f59e1027 258 workerNode.usage.tasks.executing === 0
a4e07f72
JB
259 ? accumulator + 1
260 : accumulator,
6b27d407
JB
261 0
262 ),
263 busyWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
f59e1027 265 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
266 0
267 ),
a4e07f72 268 executedTasks: this.workerNodes.reduce(
6b27d407 269 (accumulator, workerNode) =>
f59e1027 270 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
271 0
272 ),
273 executingTasks: this.workerNodes.reduce(
274 (accumulator, workerNode) =>
f59e1027 275 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
276 0
277 ),
278 queuedTasks: this.workerNodes.reduce(
df593701 279 (accumulator, workerNode) =>
f59e1027 280 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
281 0
282 ),
283 maxQueuedTasks: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
f59e1027 285 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 286 0
a4e07f72
JB
287 ),
288 failedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
f59e1027 290 accumulator + workerNode.usage.tasks.failed,
a4e07f72 291 0
6b27d407
JB
292 )
293 }
294 }
08f3f44c 295
8881ae32
JB
296 /**
297 * Pool type.
298 *
299 * If it is `'dynamic'`, it provides the `max` property.
300 */
301 protected abstract get type (): PoolType
302
184855e6
JB
303 /**
304 * Gets the worker type.
305 */
306 protected abstract get worker (): WorkerType
307
c2ade475 308 /**
6b27d407 309 * Pool minimum size.
c2ade475 310 */
6b27d407 311 protected abstract get minSize (): number
ff733df7
JB
312
313 /**
6b27d407 314 * Pool maximum size.
ff733df7 315 */
6b27d407 316 protected abstract get maxSize (): number
a35560ba 317
f59e1027
JB
318 /**
319 * Get the worker given its id.
320 *
321 * @param workerId - The worker id.
322 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
323 */
324 private getWorkerById (workerId: number): Worker | undefined {
325 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
326 ?.worker
327 }
328
ffcbbad8 329 /**
f06e48d8 330 * Gets the given worker its worker node key.
ffcbbad8
JB
331 *
332 * @param worker - The worker.
f59e1027 333 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 334 */
f06e48d8
JB
335 private getWorkerNodeKey (worker: Worker): number {
336 return this.workerNodes.findIndex(
337 workerNode => workerNode.worker === worker
338 )
bf9549ae
JB
339 }
340
afc003b2 341 /** @inheritDoc */
a35560ba 342 public setWorkerChoiceStrategy (
59219cbb
JB
343 workerChoiceStrategy: WorkerChoiceStrategy,
344 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 345 ): void {
aee46736 346 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 347 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
348 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
349 this.opts.workerChoiceStrategy
350 )
351 if (workerChoiceStrategyOptions != null) {
352 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
353 }
9c16fb4b 354 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
355 this.setWorkerNodeTasksUsage(
356 workerNode,
9c16fb4b 357 this.getWorkerUsage(workerNodeKey)
8604aaab 358 )
b6b32453 359 this.setWorkerStatistics(workerNode.worker)
59219cbb 360 }
a20f0ba5
JB
361 }
362
363 /** @inheritDoc */
364 public setWorkerChoiceStrategyOptions (
365 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
366 ): void {
0d80593b 367 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
368 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
369 this.workerChoiceStrategyContext.setOptions(
370 this.opts.workerChoiceStrategyOptions
a35560ba
S
371 )
372 }
373
a20f0ba5 374 /** @inheritDoc */
8f52842f
JB
375 public enableTasksQueue (
376 enable: boolean,
377 tasksQueueOptions?: TasksQueueOptions
378 ): void {
a20f0ba5 379 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 380 this.flushTasksQueues()
a20f0ba5
JB
381 }
382 this.opts.enableTasksQueue = enable
8f52842f 383 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
384 }
385
386 /** @inheritDoc */
8f52842f 387 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 388 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
389 this.checkValidTasksQueueOptions(tasksQueueOptions)
390 this.opts.tasksQueueOptions =
391 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 392 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
393 delete this.opts.tasksQueueOptions
394 }
395 }
396
397 private buildTasksQueueOptions (
398 tasksQueueOptions: TasksQueueOptions
399 ): TasksQueueOptions {
400 return {
401 concurrency: tasksQueueOptions?.concurrency ?? 1
402 }
403 }
404
c319c66b
JB
405 /**
406 * Whether the pool is full or not.
407 *
408 * The pool filling boolean status.
409 */
dea903a8
JB
410 protected get full (): boolean {
411 return this.workerNodes.length >= this.maxSize
412 }
c2ade475 413
c319c66b
JB
414 /**
415 * Whether the pool is busy or not.
416 *
417 * The pool busyness boolean status.
418 */
419 protected abstract get busy (): boolean
7c0ba920 420
6c6afb84
JB
421 /**
422 * Whether worker nodes are executing at least one task.
423 *
424 * @returns Worker nodes busyness boolean status.
425 */
c2ade475 426 protected internalBusy (): boolean {
e0ae6100
JB
427 return (
428 this.workerNodes.findIndex(workerNode => {
f59e1027 429 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
430 }) === -1
431 )
cb70b19d
JB
432 }
433
afc003b2 434 /** @inheritDoc */
a86b6df1 435 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 436 const timestamp = performance.now()
20dcad1a 437 const workerNodeKey = this.chooseWorkerNode()
adc3c320 438 const submittedTask: Task<Data> = {
a86b6df1 439 name,
e5a5c0fc
JB
440 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
441 data: data ?? ({} as Data),
b6b32453 442 timestamp,
adc3c320
JB
443 id: crypto.randomUUID()
444 }
2e81254d 445 const res = new Promise<Response>((resolve, reject) => {
02706357 446 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
447 resolve,
448 reject,
20dcad1a 449 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
450 })
451 })
ff733df7
JB
452 if (
453 this.opts.enableTasksQueue === true &&
7171d33f 454 (this.busy ||
f59e1027 455 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 456 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 457 .concurrency as number))
ff733df7 458 ) {
26a929d7
JB
459 this.enqueueTask(workerNodeKey, submittedTask)
460 } else {
2e81254d 461 this.executeTask(workerNodeKey, submittedTask)
adc3c320 462 }
ff733df7 463 this.checkAndEmitEvents()
78cea37e 464 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
465 return res
466 }
c97c7edb 467
afc003b2 468 /** @inheritDoc */
c97c7edb 469 public async destroy (): Promise<void> {
1fbcaa7c 470 await Promise.all(
875a7c37
JB
471 this.workerNodes.map(async (workerNode, workerNodeKey) => {
472 this.flushTasksQueue(workerNodeKey)
47aacbaa 473 // FIXME: wait for tasks to be finished
f06e48d8 474 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
475 })
476 )
c97c7edb
S
477 }
478
4a6952ff 479 /**
6c6afb84 480 * Terminates the given worker.
4a6952ff 481 *
f06e48d8 482 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
483 */
484 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 485
729c563d 486 /**
6677a3d3
JB
487 * Setup hook to execute code before worker nodes are created in the abstract constructor.
488 * Can be overridden.
afc003b2
JB
489 *
490 * @virtual
729c563d 491 */
280c2a77 492 protected setupHook (): void {
d99ba5a8 493 // Intentionally empty
280c2a77 494 }
c97c7edb 495
729c563d 496 /**
280c2a77
S
497 * Should return whether the worker is the main worker or not.
498 */
499 protected abstract isMain (): boolean
500
501 /**
2e81254d 502 * Hook executed before the worker task execution.
bf9549ae 503 * Can be overridden.
729c563d 504 *
f06e48d8 505 * @param workerNodeKey - The worker node key.
1c6fe997 506 * @param task - The task to execute.
729c563d 507 */
1c6fe997
JB
508 protected beforeTaskExecutionHook (
509 workerNodeKey: number,
510 task: Task<Data>
511 ): void {
f59e1027 512 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
513 ++workerUsage.tasks.executing
514 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
515 }
516
c01733f1 517 /**
2e81254d 518 * Hook executed after the worker task execution.
bf9549ae 519 * Can be overridden.
c01733f1 520 *
c923ce56 521 * @param worker - The worker.
38e795c1 522 * @param message - The received message.
c01733f1 523 */
2e81254d 524 protected afterTaskExecutionHook (
c923ce56 525 worker: Worker,
2740a743 526 message: MessageValue<Response>
bf9549ae 527 ): void {
f59e1027 528 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
529 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
530 this.updateRunTimeWorkerUsage(workerUsage, message)
531 this.updateEluWorkerUsage(workerUsage, message)
532 }
533
534 private updateTaskStatisticsWorkerUsage (
535 workerUsage: WorkerUsage,
536 message: MessageValue<Response>
537 ): void {
a4e07f72
JB
538 const workerTaskStatistics = workerUsage.tasks
539 --workerTaskStatistics.executing
540 ++workerTaskStatistics.executed
82f36766 541 if (message.taskError != null) {
a4e07f72 542 ++workerTaskStatistics.failed
2740a743 543 }
f8eb0a2a
JB
544 }
545
a4e07f72
JB
546 private updateRunTimeWorkerUsage (
547 workerUsage: WorkerUsage,
f8eb0a2a
JB
548 message: MessageValue<Response>
549 ): void {
87de9ff5
JB
550 if (
551 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 552 .aggregate
87de9ff5 553 ) {
932fc8be 554 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 555 if (
932fc8be
JB
556 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
557 .average &&
a4e07f72 558 workerUsage.tasks.executed !== 0
c6bd2650 559 ) {
a4e07f72 560 workerUsage.runTime.average =
f1c06930
JB
561 workerUsage.runTime.aggregate /
562 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 563 }
3fa4cdd2 564 if (
932fc8be
JB
565 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
566 .median &&
d715b7bc 567 message.taskPerformance?.runTime != null
3fa4cdd2 568 ) {
a4e07f72
JB
569 workerUsage.runTime.history.push(message.taskPerformance.runTime)
570 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 571 }
3032893a 572 }
f8eb0a2a
JB
573 }
574
a4e07f72
JB
575 private updateWaitTimeWorkerUsage (
576 workerUsage: WorkerUsage,
1c6fe997 577 task: Task<Data>
f8eb0a2a 578 ): void {
1c6fe997
JB
579 const timestamp = performance.now()
580 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
581 if (
582 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 583 .aggregate
87de9ff5 584 ) {
932fc8be 585 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 586 if (
87de9ff5 587 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 588 .waitTime.average &&
a4e07f72 589 workerUsage.tasks.executed !== 0
09a6305f 590 ) {
a4e07f72 591 workerUsage.waitTime.average =
f1c06930
JB
592 workerUsage.waitTime.aggregate /
593 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
594 }
595 if (
87de9ff5 596 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 597 .waitTime.median &&
1c6fe997 598 taskWaitTime != null
09a6305f 599 ) {
1c6fe997 600 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 601 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 602 }
0567595a 603 }
c01733f1 604 }
605
a4e07f72 606 private updateEluWorkerUsage (
5df69fab 607 workerUsage: WorkerUsage,
62c15a68
JB
608 message: MessageValue<Response>
609 ): void {
5df69fab
JB
610 if (
611 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
612 .aggregate
613 ) {
614 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
615 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
616 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
617 workerUsage.elu.utilization =
618 (workerUsage.elu.utilization +
619 message.taskPerformance.elu.utilization) /
620 2
621 } else if (message.taskPerformance?.elu != null) {
622 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
623 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
624 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
625 }
d715b7bc 626 if (
5df69fab
JB
627 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
628 .average &&
629 workerUsage.tasks.executed !== 0
630 ) {
f1c06930
JB
631 const executedTasks =
632 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 633 workerUsage.elu.idle.average =
f1c06930 634 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 635 workerUsage.elu.active.average =
f1c06930 636 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
637 }
638 if (
639 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
640 .median &&
d715b7bc
JB
641 message.taskPerformance?.elu != null
642 ) {
5df69fab
JB
643 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
644 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
645 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
646 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
647 }
648 }
649 }
650
280c2a77 651 /**
f06e48d8 652 * Chooses a worker node for the next task.
280c2a77 653 *
6c6afb84 654 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 655 *
20dcad1a 656 * @returns The worker node key
280c2a77 657 */
6c6afb84 658 private chooseWorkerNode (): number {
930dcf12 659 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
660 const worker = this.createAndSetupDynamicWorker()
661 if (
662 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
663 ) {
664 return this.getWorkerNodeKey(worker)
665 }
17393ac8 666 }
930dcf12
JB
667 return this.workerChoiceStrategyContext.execute()
668 }
669
6c6afb84
JB
670 /**
671 * Conditions for dynamic worker creation.
672 *
673 * @returns Whether to create a dynamic worker or not.
674 */
675 private shallCreateDynamicWorker (): boolean {
930dcf12 676 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
677 }
678
280c2a77 679 /**
675bb809 680 * Sends a message to the given worker.
280c2a77 681 *
38e795c1
JB
682 * @param worker - The worker which should receive the message.
683 * @param message - The message.
280c2a77
S
684 */
685 protected abstract sendToWorker (
686 worker: Worker,
687 message: MessageValue<Data>
688 ): void
689
4a6952ff 690 /**
f06e48d8 691 * Registers a listener callback on the given worker.
4a6952ff 692 *
38e795c1
JB
693 * @param worker - The worker which should register a listener.
694 * @param listener - The message listener callback.
4a6952ff 695 */
e102732c
JB
696 private registerWorkerMessageListener<Message extends Data | Response>(
697 worker: Worker,
698 listener: (message: MessageValue<Message>) => void
699 ): void {
700 worker.on('message', listener as MessageHandler<Worker>)
701 }
c97c7edb 702
729c563d 703 /**
41344292 704 * Creates a new worker.
6c6afb84
JB
705 *
706 * @returns Newly created worker.
729c563d 707 */
280c2a77 708 protected abstract createWorker (): Worker
c97c7edb 709
729c563d 710 /**
f06e48d8 711 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 712 * Can be overridden.
729c563d 713 *
38e795c1 714 * @param worker - The newly created worker.
729c563d 715 */
6677a3d3 716 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
717 // Listen to worker messages.
718 this.registerWorkerMessageListener(worker, this.workerListener())
719 }
c97c7edb 720
4a6952ff 721 /**
f06e48d8 722 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
723 *
724 * @returns New, completely set up worker.
725 */
726 protected createAndSetupWorker (): Worker {
bdacc2d2 727 const worker = this.createWorker()
280c2a77 728
35cf1c03 729 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 730 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
731 worker.on('error', error => {
732 if (this.emitter != null) {
733 this.emitter.emit(PoolEvents.error, error)
734 }
f59e1027 735 if (this.opts.restartWorkerOnError === true && !this.starting) {
1f68cede 736 this.createAndSetupWorker()
5baee0d7
JB
737 }
738 })
a35560ba
S
739 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
740 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 741 worker.once('exit', () => {
f06e48d8 742 this.removeWorkerNode(worker)
a974afa6 743 })
280c2a77 744
f06e48d8 745 this.pushWorkerNode(worker)
280c2a77 746
b6b32453
JB
747 this.setWorkerStatistics(worker)
748
280c2a77
S
749 this.afterWorkerSetup(worker)
750
c97c7edb
S
751 return worker
752 }
be0676b3 753
930dcf12
JB
754 /**
755 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
756 *
757 * @returns New, completely set up dynamic worker.
758 */
759 protected createAndSetupDynamicWorker (): Worker {
760 const worker = this.createAndSetupWorker()
761 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 762 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
763 if (
764 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
765 (message.kill != null &&
766 ((this.opts.enableTasksQueue === false &&
f59e1027 767 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 768 (this.opts.enableTasksQueue === true &&
f59e1027 769 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 770 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
771 ) {
772 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
773 void (this.destroyWorker(worker) as Promise<void>)
774 }
775 })
776 return worker
777 }
778
be0676b3 779 /**
ff733df7 780 * This function is the listener registered for each worker message.
be0676b3 781 *
bdacc2d2 782 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
783 */
784 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 785 return message => {
f59e1027
JB
786 if (message.workerId != null && message.started != null) {
787 // Worker started message received
83fa0a36
JB
788 const worker = this.getWorkerById(message.workerId)
789 if (worker != null) {
790 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
791 message.started
792 } else {
793 throw new Error('Worker started message received from unknown worker')
794 }
f59e1027 795 } else if (message.id != null) {
a3445496 796 // Task execution response received
2740a743 797 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 798 if (promiseResponse != null) {
82f36766 799 if (message.taskError != null) {
91ee39ed 800 if (this.emitter != null) {
82f36766 801 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 802 }
cd9580e7 803 promiseResponse.reject(message.taskError.message)
a05c10de 804 } else {
2740a743 805 promiseResponse.resolve(message.data as Response)
a05c10de 806 }
2e81254d 807 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 808 this.promiseResponseMap.delete(message.id)
ff733df7
JB
809 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
810 if (
811 this.opts.enableTasksQueue === true &&
416fd65c 812 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 813 ) {
2e81254d
JB
814 this.executeTask(
815 workerNodeKey,
ff733df7
JB
816 this.dequeueTask(workerNodeKey) as Task<Data>
817 )
818 }
e5536a06 819 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
820 }
821 }
822 }
be0676b3 823 }
7c0ba920 824
ff733df7 825 private checkAndEmitEvents (): void {
1f68cede 826 if (this.emitter != null) {
ff733df7 827 if (this.busy) {
6b27d407 828 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 829 }
6b27d407
JB
830 if (this.type === PoolTypes.dynamic && this.full) {
831 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 832 }
164d950a
JB
833 }
834 }
835
0ebe2a9f
JB
836 /**
837 * Sets the given worker node its tasks usage in the pool.
838 *
839 * @param workerNode - The worker node.
a4e07f72 840 * @param workerUsage - The worker usage.
0ebe2a9f
JB
841 */
842 private setWorkerNodeTasksUsage (
843 workerNode: WorkerNode<Worker, Data>,
a4e07f72 844 workerUsage: WorkerUsage
0ebe2a9f 845 ): void {
f59e1027 846 workerNode.usage = workerUsage
0ebe2a9f
JB
847 }
848
a05c10de 849 /**
f06e48d8 850 * Pushes the given worker in the pool worker nodes.
ea7a90d3 851 *
38e795c1 852 * @param worker - The worker.
f06e48d8 853 * @returns The worker nodes length.
ea7a90d3 854 */
f06e48d8 855 private pushWorkerNode (worker: Worker): number {
9c16fb4b 856 this.workerNodes.push({
ffcbbad8 857 worker,
83fa0a36 858 info: { id: this.getWorkerId(worker), started: false },
f59e1027 859 usage: this.getWorkerUsage(),
29ee7e9a 860 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 861 })
9c16fb4b
JB
862 const workerNodeKey = this.getWorkerNodeKey(worker)
863 this.setWorkerNodeTasksUsage(
864 this.workerNodes[workerNodeKey],
865 this.getWorkerUsage(workerNodeKey)
866 )
867 return this.workerNodes.length
ea7a90d3 868 }
c923ce56 869
83fa0a36
JB
870 /**
871 * Gets the worker id.
872 *
873 * @param worker - The worker.
874 * @returns The worker id.
875 */
876 private getWorkerId (worker: Worker): number | undefined {
877 if (this.worker === WorkerTypes.thread) {
878 return worker.threadId
879 } else if (this.worker === WorkerTypes.cluster) {
880 return worker.id
881 }
882 }
883
8604aaab
JB
884 // /**
885 // * Sets the given worker in the pool worker nodes.
886 // *
887 // * @param workerNodeKey - The worker node key.
888 // * @param worker - The worker.
f59e1027 889 // * @param workerInfo - The worker info.
8604aaab
JB
890 // * @param workerUsage - The worker usage.
891 // * @param tasksQueue - The worker task queue.
892 // */
893 // private setWorkerNode (
894 // workerNodeKey: number,
895 // worker: Worker,
f59e1027 896 // workerInfo: WorkerInfo,
8604aaab
JB
897 // workerUsage: WorkerUsage,
898 // tasksQueue: Queue<Task<Data>>
899 // ): void {
900 // this.workerNodes[workerNodeKey] = {
901 // worker,
f59e1027
JB
902 // info: workerInfo,
903 // usage: workerUsage,
8604aaab
JB
904 // tasksQueue
905 // }
906 // }
51fe3d3c
JB
907
908 /**
f06e48d8 909 * Removes the given worker from the pool worker nodes.
51fe3d3c 910 *
f06e48d8 911 * @param worker - The worker.
51fe3d3c 912 */
416fd65c 913 private removeWorkerNode (worker: Worker): void {
f06e48d8 914 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
915 if (workerNodeKey !== -1) {
916 this.workerNodes.splice(workerNodeKey, 1)
917 this.workerChoiceStrategyContext.remove(workerNodeKey)
918 }
51fe3d3c 919 }
adc3c320 920
2e81254d 921 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 922 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
923 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
924 }
925
f9f00b5f 926 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 927 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
928 }
929
416fd65c 930 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 931 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
932 }
933
416fd65c 934 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 935 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 936 }
ff733df7 937
df593701
JB
938 private tasksMaxQueueSize (workerNodeKey: number): number {
939 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
940 }
941
416fd65c
JB
942 private flushTasksQueue (workerNodeKey: number): void {
943 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
944 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
945 this.executeTask(
946 workerNodeKey,
947 this.dequeueTask(workerNodeKey) as Task<Data>
948 )
ff733df7 949 }
ff733df7 950 }
df593701 951 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
952 }
953
ef41a6e6
JB
954 private flushTasksQueues (): void {
955 for (const [workerNodeKey] of this.workerNodes.entries()) {
956 this.flushTasksQueue(workerNodeKey)
957 }
958 }
b6b32453
JB
959
960 private setWorkerStatistics (worker: Worker): void {
961 this.sendToWorker(worker, {
962 statistics: {
87de9ff5
JB
963 runTime:
964 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 965 .runTime.aggregate,
87de9ff5 966 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 967 .elu.aggregate
b6b32453
JB
968 }
969 })
970 }
8604aaab 971
9c16fb4b 972 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
973 const getTasksQueueSize = (workerNodeKey?: number): number => {
974 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 975 }
df593701
JB
976 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
977 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
978 }
8604aaab 979 return {
9c16fb4b
JB
980 tasks: {
981 executed: 0,
982 executing: 0,
983 get queued (): number {
e3347a5c 984 return getTasksQueueSize(workerNodeKey)
9c16fb4b 985 },
df593701
JB
986 get maxQueued (): number {
987 return getTasksMaxQueueSize(workerNodeKey)
988 },
9c16fb4b
JB
989 failed: 0
990 },
8604aaab 991 runTime: {
932fc8be 992 aggregate: 0,
8604aaab
JB
993 average: 0,
994 median: 0,
995 history: new CircularArray()
996 },
997 waitTime: {
932fc8be 998 aggregate: 0,
8604aaab
JB
999 average: 0,
1000 median: 0,
1001 history: new CircularArray()
1002 },
5df69fab
JB
1003 elu: {
1004 idle: {
1005 aggregate: 0,
1006 average: 0,
1007 median: 0,
1008 history: new CircularArray()
1009 },
1010 active: {
1011 aggregate: 0,
1012 average: 0,
1013 median: 0,
1014 history: new CircularArray()
1015 },
1016 utilization: 0
1017 }
8604aaab
JB
1018 }
1019 }
c97c7edb 1020}