Merge branch 'master' into elu-strategy
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
f06e48d8 24import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
a35560ba
S
25import {
26 WorkerChoiceStrategies,
a20f0ba5
JB
27 type WorkerChoiceStrategy,
28 type WorkerChoiceStrategyOptions
bdaf31cd
JB
29} from './selection-strategies/selection-strategies-types'
30import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 31
729c563d 32/**
ea7a90d3 33 * Base class that implements some shared logic for all poolifier pools.
729c563d 34 *
38e795c1
JB
35 * @typeParam Worker - Type of worker which manages this pool.
36 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 37 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 38 */
c97c7edb 39export abstract class AbstractPool<
f06e48d8 40 Worker extends IWorker,
d3c8a1a8
S
41 Data = unknown,
42 Response = unknown
c4855468 43> implements IPool<Worker, Data, Response> {
afc003b2 44 /** @inheritDoc */
f06e48d8 45 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 46
afc003b2 47 /** @inheritDoc */
7c0ba920
JB
48 public readonly emitter?: PoolEmitter
49
be0676b3 50 /**
a3445496 51 * The execution response promise map.
be0676b3 52 *
2740a743 53 * - `key`: The message id of each submitted task.
a3445496 54 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 55 *
a3445496 56 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 57 */
c923ce56
JB
58 protected promiseResponseMap: Map<
59 string,
60 PromiseResponseWrapper<Worker, Response>
61 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 62
a35560ba 63 /**
51fe3d3c 64 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 65 *
51fe3d3c 66 * Default to a round robin algorithm.
a35560ba
S
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
69 Worker,
70 Data,
71 Response
a35560ba
S
72 >
73
729c563d
S
74 /**
75 * Constructs a new poolifier pool.
76 *
38e795c1 77 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 78 * @param filePath - Path to the worker file.
38e795c1 79 * @param opts - Options for the pool.
729c563d 80 */
c97c7edb 81 public constructor (
b4213b7f
JB
82 protected readonly numberOfWorkers: number,
83 protected readonly filePath: string,
84 protected readonly opts: PoolOptions<Worker>
c97c7edb 85 ) {
78cea37e 86 if (!this.isMain()) {
c97c7edb
S
87 throw new Error('Cannot start a pool from a worker!')
88 }
8d3782fa 89 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 90 this.checkFilePath(this.filePath)
7c0ba920 91 this.checkPoolOptions(this.opts)
1086026a 92
7254e419
JB
93 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
94 this.executeTask = this.executeTask.bind(this)
95 this.enqueueTask = this.enqueueTask.bind(this)
96 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 97
c97c7edb
S
98 this.setupHook()
99
5c5a1fb7 100 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 101 this.createAndSetupWorker()
c97c7edb
S
102 }
103
6bd72cd0 104 if (this.opts.enableEvents === true) {
7c0ba920
JB
105 this.emitter = new PoolEmitter()
106 }
d59df138
JB
107 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
108 Worker,
109 Data,
110 Response
da309861
JB
111 >(
112 this,
113 this.opts.workerChoiceStrategy,
114 this.opts.workerChoiceStrategyOptions
115 )
c97c7edb
S
116 }
117
a35560ba 118 private checkFilePath (filePath: string): void {
ffcbbad8
JB
119 if (
120 filePath == null ||
121 (typeof filePath === 'string' && filePath.trim().length === 0)
122 ) {
c510fea7
APA
123 throw new Error('Please specify a file with a worker implementation')
124 }
125 }
126
8d3782fa
JB
127 private checkNumberOfWorkers (numberOfWorkers: number): void {
128 if (numberOfWorkers == null) {
129 throw new Error(
130 'Cannot instantiate a pool without specifying the number of workers'
131 )
78cea37e 132 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 133 throw new TypeError(
0d80593b 134 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
135 )
136 } else if (numberOfWorkers < 0) {
473c717a 137 throw new RangeError(
8d3782fa
JB
138 'Cannot instantiate a pool with a negative number of workers'
139 )
6b27d407 140 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
141 throw new Error('Cannot instantiate a fixed pool with no worker')
142 }
143 }
144
7c0ba920 145 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
146 if (isPlainObject(opts)) {
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
149 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
150 this.opts.workerChoiceStrategyOptions =
151 opts.workerChoiceStrategyOptions ??
152 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
153 this.checkValidWorkerChoiceStrategyOptions(
154 this.opts.workerChoiceStrategyOptions
155 )
1f68cede 156 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
157 this.opts.enableEvents = opts.enableEvents ?? true
158 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
159 if (this.opts.enableTasksQueue) {
160 this.checkValidTasksQueueOptions(
161 opts.tasksQueueOptions as TasksQueueOptions
162 )
163 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
164 opts.tasksQueueOptions as TasksQueueOptions
165 )
166 }
167 } else {
168 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 169 }
aee46736
JB
170 }
171
172 private checkValidWorkerChoiceStrategy (
173 workerChoiceStrategy: WorkerChoiceStrategy
174 ): void {
175 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 176 throw new Error(
aee46736 177 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
178 )
179 }
7c0ba920
JB
180 }
181
0d80593b
JB
182 private checkValidWorkerChoiceStrategyOptions (
183 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
184 ): void {
185 if (!isPlainObject(workerChoiceStrategyOptions)) {
186 throw new TypeError(
187 'Invalid worker choice strategy options: must be a plain object'
188 )
189 }
49be33fe
JB
190 if (
191 workerChoiceStrategyOptions.weights != null &&
6b27d407 192 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
193 ) {
194 throw new Error(
195 'Invalid worker choice strategy options: must have a weight for each worker node'
196 )
197 }
0d80593b
JB
198 }
199
a20f0ba5
JB
200 private checkValidTasksQueueOptions (
201 tasksQueueOptions: TasksQueueOptions
202 ): void {
0d80593b
JB
203 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
204 throw new TypeError('Invalid tasks queue options: must be a plain object')
205 }
a20f0ba5
JB
206 if ((tasksQueueOptions?.concurrency as number) <= 0) {
207 throw new Error(
208 `Invalid worker tasks concurrency '${
209 tasksQueueOptions.concurrency as number
210 }'`
211 )
212 }
213 }
214
08f3f44c 215 /** @inheritDoc */
6b27d407
JB
216 public get info (): PoolInfo {
217 return {
218 type: this.type,
184855e6 219 worker: this.worker,
6b27d407
JB
220 minSize: this.minSize,
221 maxSize: this.maxSize,
222 workerNodes: this.workerNodes.length,
223 idleWorkerNodes: this.workerNodes.reduce(
224 (accumulator, workerNode) =>
225 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
226 0
227 ),
228 busyWorkerNodes: this.workerNodes.reduce(
229 (accumulator, workerNode) =>
230 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
231 0
232 ),
233 runningTasks: this.workerNodes.reduce(
234 (accumulator, workerNode) =>
235 accumulator + workerNode.tasksUsage.running,
236 0
237 ),
238 queuedTasks: this.workerNodes.reduce(
239 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
240 0
241 ),
242 maxQueuedTasks: this.workerNodes.reduce(
243 (accumulator, workerNode) =>
244 accumulator + workerNode.tasksQueue.maxSize,
245 0
246 )
247 }
248 }
08f3f44c 249
8881ae32
JB
250 /**
251 * Pool type.
252 *
253 * If it is `'dynamic'`, it provides the `max` property.
254 */
255 protected abstract get type (): PoolType
256
184855e6
JB
257 /**
258 * Gets the worker type.
259 */
260 protected abstract get worker (): WorkerType
261
c2ade475 262 /**
6b27d407 263 * Pool minimum size.
c2ade475 264 */
6b27d407 265 protected abstract get minSize (): number
ff733df7
JB
266
267 /**
6b27d407 268 * Pool maximum size.
ff733df7 269 */
6b27d407 270 protected abstract get maxSize (): number
a35560ba 271
ffcbbad8 272 /**
f06e48d8 273 * Gets the given worker its worker node key.
ffcbbad8
JB
274 *
275 * @param worker - The worker.
f06e48d8 276 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 277 */
f06e48d8
JB
278 private getWorkerNodeKey (worker: Worker): number {
279 return this.workerNodes.findIndex(
280 workerNode => workerNode.worker === worker
281 )
bf9549ae
JB
282 }
283
afc003b2 284 /** @inheritDoc */
a35560ba 285 public setWorkerChoiceStrategy (
59219cbb
JB
286 workerChoiceStrategy: WorkerChoiceStrategy,
287 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 288 ): void {
aee46736 289 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 290 this.opts.workerChoiceStrategy = workerChoiceStrategy
0ebe2a9f 291 for (const workerNode of this.workerNodes) {
f82cd357 292 this.setWorkerNodeTasksUsage(workerNode, {
1ab50fe5 293 ran: 0,
f82cd357
JB
294 running: 0,
295 runTime: 0,
296 runTimeHistory: new CircularArray(),
297 avgRunTime: 0,
298 medRunTime: 0,
0567595a
JB
299 waitTime: 0,
300 waitTimeHistory: new CircularArray(),
301 avgWaitTime: 0,
302 medWaitTime: 0,
62c15a68
JB
303 error: 0,
304 elu: undefined
f82cd357 305 })
ea7a90d3 306 }
a35560ba 307 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
a20f0ba5
JB
308 this.opts.workerChoiceStrategy
309 )
59219cbb
JB
310 if (workerChoiceStrategyOptions != null) {
311 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
312 }
a20f0ba5
JB
313 }
314
315 /** @inheritDoc */
316 public setWorkerChoiceStrategyOptions (
317 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
318 ): void {
0d80593b 319 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
320 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
321 this.workerChoiceStrategyContext.setOptions(
322 this.opts.workerChoiceStrategyOptions
a35560ba
S
323 )
324 }
325
a20f0ba5 326 /** @inheritDoc */
8f52842f
JB
327 public enableTasksQueue (
328 enable: boolean,
329 tasksQueueOptions?: TasksQueueOptions
330 ): void {
a20f0ba5 331 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 332 this.flushTasksQueues()
a20f0ba5
JB
333 }
334 this.opts.enableTasksQueue = enable
8f52842f 335 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
336 }
337
338 /** @inheritDoc */
8f52842f 339 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 340 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
341 this.checkValidTasksQueueOptions(tasksQueueOptions)
342 this.opts.tasksQueueOptions =
343 this.buildTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
344 } else {
345 delete this.opts.tasksQueueOptions
346 }
347 }
348
349 private buildTasksQueueOptions (
350 tasksQueueOptions: TasksQueueOptions
351 ): TasksQueueOptions {
352 return {
353 concurrency: tasksQueueOptions?.concurrency ?? 1
354 }
355 }
356
c319c66b
JB
357 /**
358 * Whether the pool is full or not.
359 *
360 * The pool filling boolean status.
361 */
dea903a8
JB
362 protected get full (): boolean {
363 return this.workerNodes.length >= this.maxSize
364 }
c2ade475 365
c319c66b
JB
366 /**
367 * Whether the pool is busy or not.
368 *
369 * The pool busyness boolean status.
370 */
371 protected abstract get busy (): boolean
7c0ba920 372
c2ade475 373 protected internalBusy (): boolean {
e0ae6100
JB
374 return (
375 this.workerNodes.findIndex(workerNode => {
a4958de2 376 return workerNode.tasksUsage.running === 0
e0ae6100
JB
377 }) === -1
378 )
cb70b19d
JB
379 }
380
afc003b2 381 /** @inheritDoc */
a86b6df1 382 public async execute (data?: Data, name?: string): Promise<Response> {
0567595a 383 const submissionTimestamp = performance.now()
20dcad1a 384 const workerNodeKey = this.chooseWorkerNode()
adc3c320 385 const submittedTask: Task<Data> = {
a86b6df1 386 name,
e5a5c0fc
JB
387 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
388 data: data ?? ({} as Data),
0567595a 389 submissionTimestamp,
adc3c320
JB
390 id: crypto.randomUUID()
391 }
2e81254d 392 const res = new Promise<Response>((resolve, reject) => {
02706357 393 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
394 resolve,
395 reject,
20dcad1a 396 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
397 })
398 })
ff733df7
JB
399 if (
400 this.opts.enableTasksQueue === true &&
7171d33f 401 (this.busy ||
3528c992 402 this.workerNodes[workerNodeKey].tasksUsage.running >=
7171d33f 403 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 404 .concurrency as number))
ff733df7 405 ) {
26a929d7
JB
406 this.enqueueTask(workerNodeKey, submittedTask)
407 } else {
2e81254d 408 this.executeTask(workerNodeKey, submittedTask)
adc3c320 409 }
b0d6ed8f 410 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 411 this.checkAndEmitEvents()
78cea37e 412 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
413 return res
414 }
c97c7edb 415
afc003b2 416 /** @inheritDoc */
c97c7edb 417 public async destroy (): Promise<void> {
1fbcaa7c 418 await Promise.all(
875a7c37
JB
419 this.workerNodes.map(async (workerNode, workerNodeKey) => {
420 this.flushTasksQueue(workerNodeKey)
47aacbaa 421 // FIXME: wait for tasks to be finished
f06e48d8 422 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
423 })
424 )
c97c7edb
S
425 }
426
4a6952ff 427 /**
f06e48d8 428 * Shutdowns the given worker.
4a6952ff 429 *
f06e48d8 430 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
431 */
432 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 433
729c563d 434 /**
2e81254d 435 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 436 * Can be overridden
afc003b2
JB
437 *
438 * @virtual
729c563d 439 */
280c2a77 440 protected setupHook (): void {
d99ba5a8 441 // Intentionally empty
280c2a77 442 }
c97c7edb 443
729c563d 444 /**
280c2a77
S
445 * Should return whether the worker is the main worker or not.
446 */
447 protected abstract isMain (): boolean
448
449 /**
2e81254d 450 * Hook executed before the worker task execution.
bf9549ae 451 * Can be overridden.
729c563d 452 *
f06e48d8 453 * @param workerNodeKey - The worker node key.
729c563d 454 */
f20f344f 455 protected beforeTaskExecutionHook (workerNodeKey: number): void {
09a6305f 456 ++this.workerNodes[workerNodeKey].tasksUsage.running
c97c7edb
S
457 }
458
c01733f1 459 /**
2e81254d 460 * Hook executed after the worker task execution.
bf9549ae 461 * Can be overridden.
c01733f1 462 *
c923ce56 463 * @param worker - The worker.
38e795c1 464 * @param message - The received message.
c01733f1 465 */
2e81254d 466 protected afterTaskExecutionHook (
c923ce56 467 worker: Worker,
2740a743 468 message: MessageValue<Response>
bf9549ae 469 ): void {
f8eb0a2a
JB
470 const workerTasksUsage =
471 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
3032893a 472 --workerTasksUsage.running
1ab50fe5 473 ++workerTasksUsage.ran
2740a743
JB
474 if (message.error != null) {
475 ++workerTasksUsage.error
476 }
f8eb0a2a 477 this.updateRunTimeTasksUsage(workerTasksUsage, message)
74001280 478 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
62c15a68 479 this.updateEluTasksUsage(workerTasksUsage, message)
f8eb0a2a
JB
480 }
481
482 private updateRunTimeTasksUsage (
483 workerTasksUsage: TasksUsage,
484 message: MessageValue<Response>
485 ): void {
97a2abc3 486 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
aee46736 487 workerTasksUsage.runTime += message.runTime ?? 0
c6bd2650
JB
488 if (
489 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
1ab50fe5 490 workerTasksUsage.ran !== 0
c6bd2650 491 ) {
3032893a 492 workerTasksUsage.avgRunTime =
1ab50fe5 493 workerTasksUsage.runTime / workerTasksUsage.ran
3032893a 494 }
3fa4cdd2
JB
495 if (
496 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
497 message.runTime != null
498 ) {
499 workerTasksUsage.runTimeHistory.push(message.runTime)
78099a15
JB
500 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
501 }
3032893a 502 }
f8eb0a2a
JB
503 }
504
74001280 505 private updateWaitTimeTasksUsage (
f8eb0a2a
JB
506 workerTasksUsage: TasksUsage,
507 message: MessageValue<Response>
508 ): void {
09a6305f
JB
509 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
510 workerTasksUsage.waitTime += message.waitTime ?? 0
511 if (
512 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
1ab50fe5 513 workerTasksUsage.ran !== 0
09a6305f
JB
514 ) {
515 workerTasksUsage.avgWaitTime =
1ab50fe5 516 workerTasksUsage.waitTime / workerTasksUsage.ran
09a6305f
JB
517 }
518 if (
519 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
520 message.waitTime != null
521 ) {
522 workerTasksUsage.waitTimeHistory.push(message.waitTime)
523 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
524 }
0567595a 525 }
c01733f1 526 }
527
62c15a68
JB
528 private updateEluTasksUsage (
529 workerTasksUsage: TasksUsage,
530 message: MessageValue<Response>
531 ): void {
532 if (this.workerChoiceStrategyContext.getRequiredStatistics().elu) {
533 if (workerTasksUsage.elu != null && message.elu != null) {
534 // TODO: cumulative or delta?
535 workerTasksUsage.elu = {
536 idle: workerTasksUsage.elu.idle + message.elu.idle,
537 active: workerTasksUsage.elu.active + message.elu.active,
538 utilization:
539 workerTasksUsage.elu.utilization + message.elu.utilization
540 }
541 } else if (message.elu != null) {
542 workerTasksUsage.elu = message.elu
543 }
544 }
545 }
546
280c2a77 547 /**
f06e48d8 548 * Chooses a worker node for the next task.
280c2a77 549 *
20dcad1a 550 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 551 *
20dcad1a 552 * @returns The worker node key
280c2a77 553 */
20dcad1a 554 protected chooseWorkerNode (): number {
f06e48d8 555 let workerNodeKey: number
6b27d407 556 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
557 const workerCreated = this.createAndSetupWorker()
558 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 559 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
560 if (
561 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 562 (message.kill != null &&
a4958de2 563 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
17393ac8 564 ) {
ff733df7 565 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 566 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 567 // FIXME: wait for tasks to be finished
7c5a1080 568 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
569 }
570 })
adc3c320 571 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 572 } else {
f06e48d8 573 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 574 }
20dcad1a 575 return workerNodeKey
c97c7edb
S
576 }
577
280c2a77 578 /**
675bb809 579 * Sends a message to the given worker.
280c2a77 580 *
38e795c1
JB
581 * @param worker - The worker which should receive the message.
582 * @param message - The message.
280c2a77
S
583 */
584 protected abstract sendToWorker (
585 worker: Worker,
586 message: MessageValue<Data>
587 ): void
588
4a6952ff 589 /**
f06e48d8 590 * Registers a listener callback on the given worker.
4a6952ff 591 *
38e795c1
JB
592 * @param worker - The worker which should register a listener.
593 * @param listener - The message listener callback.
4a6952ff
JB
594 */
595 protected abstract registerWorkerMessageListener<
4f7fa42a 596 Message extends Data | Response
78cea37e 597 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 598
729c563d
S
599 /**
600 * Returns a newly created worker.
601 */
280c2a77 602 protected abstract createWorker (): Worker
c97c7edb 603
729c563d 604 /**
f06e48d8 605 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 606 *
38e795c1 607 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 608 *
38e795c1 609 * @param worker - The newly created worker.
729c563d 610 */
280c2a77 611 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 612
4a6952ff 613 /**
f06e48d8 614 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
615 *
616 * @returns New, completely set up worker.
617 */
618 protected createAndSetupWorker (): Worker {
bdacc2d2 619 const worker = this.createWorker()
280c2a77 620
35cf1c03 621 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 622 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
623 worker.on('error', error => {
624 if (this.emitter != null) {
625 this.emitter.emit(PoolEvents.error, error)
626 }
627 })
628 if (this.opts.restartWorkerOnError === true) {
629 worker.on('error', () => {
630 this.createAndSetupWorker()
631 })
632 }
a35560ba
S
633 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
634 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 635 worker.once('exit', () => {
f06e48d8 636 this.removeWorkerNode(worker)
a974afa6 637 })
280c2a77 638
f06e48d8 639 this.pushWorkerNode(worker)
280c2a77
S
640
641 this.afterWorkerSetup(worker)
642
c97c7edb
S
643 return worker
644 }
be0676b3
APA
645
646 /**
ff733df7 647 * This function is the listener registered for each worker message.
be0676b3 648 *
bdacc2d2 649 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
650 */
651 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 652 return message => {
b1989cfd 653 if (message.id != null) {
a3445496 654 // Task execution response received
2740a743 655 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 656 if (promiseResponse != null) {
78cea37e 657 if (message.error != null) {
2740a743 658 promiseResponse.reject(message.error)
91ee39ed
JB
659 if (this.emitter != null) {
660 this.emitter.emit(PoolEvents.taskError, {
661 error: message.error,
662 errorData: message.errorData
663 })
664 }
a05c10de 665 } else {
2740a743 666 promiseResponse.resolve(message.data as Response)
a05c10de 667 }
2e81254d 668 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 669 this.promiseResponseMap.delete(message.id)
ff733df7
JB
670 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
671 if (
672 this.opts.enableTasksQueue === true &&
416fd65c 673 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 674 ) {
2e81254d
JB
675 this.executeTask(
676 workerNodeKey,
ff733df7
JB
677 this.dequeueTask(workerNodeKey) as Task<Data>
678 )
679 }
be0676b3
APA
680 }
681 }
682 }
be0676b3 683 }
7c0ba920 684
ff733df7 685 private checkAndEmitEvents (): void {
1f68cede 686 if (this.emitter != null) {
ff733df7 687 if (this.busy) {
6b27d407 688 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 689 }
6b27d407
JB
690 if (this.type === PoolTypes.dynamic && this.full) {
691 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 692 }
164d950a
JB
693 }
694 }
695
0ebe2a9f
JB
696 /**
697 * Sets the given worker node its tasks usage in the pool.
698 *
699 * @param workerNode - The worker node.
700 * @param tasksUsage - The worker node tasks usage.
701 */
702 private setWorkerNodeTasksUsage (
703 workerNode: WorkerNode<Worker, Data>,
704 tasksUsage: TasksUsage
705 ): void {
706 workerNode.tasksUsage = tasksUsage
707 }
708
a05c10de 709 /**
f06e48d8 710 * Pushes the given worker in the pool worker nodes.
ea7a90d3 711 *
38e795c1 712 * @param worker - The worker.
f06e48d8 713 * @returns The worker nodes length.
ea7a90d3 714 */
f06e48d8
JB
715 private pushWorkerNode (worker: Worker): number {
716 return this.workerNodes.push({
ffcbbad8 717 worker,
f82cd357 718 tasksUsage: {
1ab50fe5 719 ran: 0,
f82cd357
JB
720 running: 0,
721 runTime: 0,
722 runTimeHistory: new CircularArray(),
723 avgRunTime: 0,
724 medRunTime: 0,
0567595a
JB
725 waitTime: 0,
726 waitTimeHistory: new CircularArray(),
727 avgWaitTime: 0,
728 medWaitTime: 0,
62c15a68
JB
729 error: 0,
730 elu: undefined
f82cd357 731 },
29ee7e9a 732 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
733 })
734 }
c923ce56
JB
735
736 /**
f06e48d8 737 * Sets the given worker in the pool worker nodes.
c923ce56 738 *
f06e48d8 739 * @param workerNodeKey - The worker node key.
c923ce56
JB
740 * @param worker - The worker.
741 * @param tasksUsage - The worker tasks usage.
f06e48d8 742 * @param tasksQueue - The worker task queue.
c923ce56 743 */
f06e48d8
JB
744 private setWorkerNode (
745 workerNodeKey: number,
c923ce56 746 worker: Worker,
f06e48d8 747 tasksUsage: TasksUsage,
29ee7e9a 748 tasksQueue: Queue<Task<Data>>
c923ce56 749 ): void {
f06e48d8 750 this.workerNodes[workerNodeKey] = {
c923ce56 751 worker,
f06e48d8
JB
752 tasksUsage,
753 tasksQueue
c923ce56
JB
754 }
755 }
51fe3d3c
JB
756
757 /**
f06e48d8 758 * Removes the given worker from the pool worker nodes.
51fe3d3c 759 *
f06e48d8 760 * @param worker - The worker.
51fe3d3c 761 */
416fd65c 762 private removeWorkerNode (worker: Worker): void {
f06e48d8 763 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
764 if (workerNodeKey !== -1) {
765 this.workerNodes.splice(workerNodeKey, 1)
766 this.workerChoiceStrategyContext.remove(workerNodeKey)
767 }
51fe3d3c 768 }
adc3c320 769
2e81254d 770 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 771 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
772 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
773 }
774
f9f00b5f 775 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 776 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
777 }
778
416fd65c 779 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 780 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
781 }
782
416fd65c 783 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 784 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 785 }
ff733df7 786
416fd65c
JB
787 private flushTasksQueue (workerNodeKey: number): void {
788 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
789 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
790 this.executeTask(
791 workerNodeKey,
792 this.dequeueTask(workerNodeKey) as Task<Data>
793 )
ff733df7 794 }
ff733df7
JB
795 }
796 }
797
ef41a6e6
JB
798 private flushTasksQueues (): void {
799 for (const [workerNodeKey] of this.workerNodes.entries()) {
800 this.flushTasksQueue(workerNodeKey)
801 }
802 }
c97c7edb 803}