fix: fix pool readiness semantic
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions
22 } from './pool'
23 import type {
24 IWorker,
25 IWorkerNode,
26 MessageHandler,
27 Task,
28 WorkerInfo,
29 WorkerType,
30 WorkerUsage
31 } from './worker'
32 import {
33 Measurements,
34 WorkerChoiceStrategies,
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
37 } from './selection-strategies/selection-strategies-types'
38 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
39 import { version } from './version'
40 import { WorkerNode } from './worker-node'
41
42 /**
43 * Base class that implements some shared logic for all poolifier pools.
44 *
45 * @typeParam Worker - Type of worker which manages this pool.
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
48 */
49 export abstract class AbstractPool<
50 Worker extends IWorker,
51 Data = unknown,
52 Response = unknown
53 > implements IPool<Worker, Data, Response> {
54 /** @inheritDoc */
55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
56
57 /** @inheritDoc */
58 public readonly emitter?: PoolEmitter
59
60 /**
61 * The execution response promise map.
62 *
63 * - `key`: The message id of each submitted task.
64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
65 *
66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
67 */
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
72
73 /**
74 * Worker choice strategy context referencing a worker choice algorithm implementation.
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
77 Worker,
78 Data,
79 Response
80 >
81
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
87 /**
88 * Constructs a new poolifier pool.
89 *
90 * @param numberOfWorkers - Number of workers that this pool should manage.
91 * @param filePath - Path to the worker file.
92 * @param opts - Options for the pool.
93 */
94 public constructor (
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
98 ) {
99 if (!this.isMain()) {
100 throw new Error('Cannot start a pool from a worker!')
101 }
102 this.checkNumberOfWorkers(this.numberOfWorkers)
103 this.checkFilePath(this.filePath)
104 this.checkPoolOptions(this.opts)
105
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
110
111 if (this.opts.enableEvents === true) {
112 this.emitter = new PoolEmitter()
113 }
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
123
124 this.setupHook()
125
126 while (this.workerNodes.length < this.numberOfWorkers) {
127 this.createAndSetupWorker()
128 }
129
130 this.startTimestamp = performance.now()
131 }
132
133 private checkFilePath (filePath: string): void {
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
148 throw new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 } else if (numberOfWorkers < 0) {
152 throw new RangeError(
153 'Cannot instantiate a pool with a negative number of workers'
154 )
155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
161 if (this.type === PoolTypes.dynamic) {
162 if (min > max) {
163 throw new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 )
166 } else if (min === 0 && max === 0) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
169 )
170 } else if (min === max) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
173 )
174 }
175 }
176 }
177
178 private checkPoolOptions (opts: PoolOptions<Worker>): void {
179 if (isPlainObject(opts)) {
180 this.opts.workerChoiceStrategy =
181 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
182 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
183 this.opts.workerChoiceStrategyOptions =
184 opts.workerChoiceStrategyOptions ??
185 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
186 this.checkValidWorkerChoiceStrategyOptions(
187 this.opts.workerChoiceStrategyOptions
188 )
189 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
190 this.opts.enableEvents = opts.enableEvents ?? true
191 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
192 if (this.opts.enableTasksQueue) {
193 this.checkValidTasksQueueOptions(
194 opts.tasksQueueOptions as TasksQueueOptions
195 )
196 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
197 opts.tasksQueueOptions as TasksQueueOptions
198 )
199 }
200 } else {
201 throw new TypeError('Invalid pool options: must be a plain object')
202 }
203 }
204
205 private checkValidWorkerChoiceStrategy (
206 workerChoiceStrategy: WorkerChoiceStrategy
207 ): void {
208 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
209 throw new Error(
210 `Invalid worker choice strategy '${workerChoiceStrategy}'`
211 )
212 }
213 }
214
215 private checkValidWorkerChoiceStrategyOptions (
216 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
217 ): void {
218 if (!isPlainObject(workerChoiceStrategyOptions)) {
219 throw new TypeError(
220 'Invalid worker choice strategy options: must be a plain object'
221 )
222 }
223 if (
224 workerChoiceStrategyOptions.weights != null &&
225 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
226 ) {
227 throw new Error(
228 'Invalid worker choice strategy options: must have a weight for each worker node'
229 )
230 }
231 if (
232 workerChoiceStrategyOptions.measurement != null &&
233 !Object.values(Measurements).includes(
234 workerChoiceStrategyOptions.measurement
235 )
236 ) {
237 throw new Error(
238 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
239 )
240 }
241 }
242
243 private checkValidTasksQueueOptions (
244 tasksQueueOptions: TasksQueueOptions
245 ): void {
246 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
247 throw new TypeError('Invalid tasks queue options: must be a plain object')
248 }
249 if (
250 tasksQueueOptions?.concurrency != null &&
251 !Number.isSafeInteger(tasksQueueOptions.concurrency)
252 ) {
253 throw new TypeError(
254 'Invalid worker tasks concurrency: must be an integer'
255 )
256 }
257 if (
258 tasksQueueOptions?.concurrency != null &&
259 tasksQueueOptions.concurrency <= 0
260 ) {
261 throw new Error(
262 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
263 )
264 }
265 }
266
267 /** @inheritDoc */
268 public get info (): PoolInfo {
269 return {
270 version,
271 type: this.type,
272 worker: this.worker,
273 ready: this.ready,
274 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
275 minSize: this.minSize,
276 maxSize: this.maxSize,
277 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .runTime.aggregate &&
279 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
280 .waitTime.aggregate && { utilization: round(this.utilization) }),
281 workerNodes: this.workerNodes.length,
282 idleWorkerNodes: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
284 workerNode.usage.tasks.executing === 0
285 ? accumulator + 1
286 : accumulator,
287 0
288 ),
289 busyWorkerNodes: this.workerNodes.reduce(
290 (accumulator, workerNode) =>
291 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
292 0
293 ),
294 executedTasks: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
296 accumulator + workerNode.usage.tasks.executed,
297 0
298 ),
299 executingTasks: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
301 accumulator + workerNode.usage.tasks.executing,
302 0
303 ),
304 queuedTasks: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 accumulator + workerNode.usage.tasks.queued,
307 0
308 ),
309 maxQueuedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 accumulator + workerNode.usage.tasks.maxQueued,
312 0
313 ),
314 failedTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.tasks.failed,
317 0
318 ),
319 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .runTime.aggregate && {
321 runTime: {
322 minimum: round(
323 Math.min(
324 ...this.workerNodes.map(
325 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
326 )
327 )
328 ),
329 maximum: round(
330 Math.max(
331 ...this.workerNodes.map(
332 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
333 )
334 )
335 ),
336 average: round(
337 this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
340 0
341 ) /
342 this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + (workerNode.usage.tasks?.executed ?? 0),
345 0
346 )
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.median && {
350 median: round(
351 median(
352 this.workerNodes.map(
353 workerNode => workerNode.usage.runTime?.median ?? 0
354 )
355 )
356 )
357 })
358 }
359 }),
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .waitTime.aggregate && {
362 waitTime: {
363 minimum: round(
364 Math.min(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
367 )
368 )
369 ),
370 maximum: round(
371 Math.max(
372 ...this.workerNodes.map(
373 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
374 )
375 )
376 ),
377 average: round(
378 this.workerNodes.reduce(
379 (accumulator, workerNode) =>
380 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
381 0
382 ) /
383 this.workerNodes.reduce(
384 (accumulator, workerNode) =>
385 accumulator + (workerNode.usage.tasks?.executed ?? 0),
386 0
387 )
388 ),
389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
390 .waitTime.median && {
391 median: round(
392 median(
393 this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.median ?? 0
395 )
396 )
397 )
398 })
399 }
400 })
401 }
402 }
403
404 private get starting (): boolean {
405 return (
406 this.workerNodes.length < this.minSize ||
407 (this.workerNodes.length >= this.minSize &&
408 this.workerNodes.some(workerNode => !workerNode.info.ready))
409 )
410 }
411
412 private get ready (): boolean {
413 return (
414 this.workerNodes.length >= this.minSize &&
415 this.workerNodes.every(workerNode => workerNode.info.ready)
416 )
417 }
418
419 /**
420 * Gets the approximate pool utilization.
421 *
422 * @returns The pool utilization.
423 */
424 private get utilization (): number {
425 const poolRunTimeCapacity =
426 (performance.now() - this.startTimestamp) * this.maxSize
427 const totalTasksRunTime = this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
430 0
431 )
432 const totalTasksWaitTime = this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
435 0
436 )
437 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
438 }
439
440 /**
441 * Pool type.
442 *
443 * If it is `'dynamic'`, it provides the `max` property.
444 */
445 protected abstract get type (): PoolType
446
447 /**
448 * Gets the worker type.
449 */
450 protected abstract get worker (): WorkerType
451
452 /**
453 * Pool minimum size.
454 */
455 protected abstract get minSize (): number
456
457 /**
458 * Pool maximum size.
459 */
460 protected abstract get maxSize (): number
461
462 /**
463 * Get the worker given its id.
464 *
465 * @param workerId - The worker id.
466 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
467 */
468 private getWorkerById (workerId: number): Worker | undefined {
469 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
470 ?.worker
471 }
472
473 private checkMessageWorkerId (message: MessageValue<Response>): void {
474 if (
475 message.workerId != null &&
476 this.getWorkerById(message.workerId) == null
477 ) {
478 throw new Error(
479 `Worker message received from unknown worker '${message.workerId}'`
480 )
481 }
482 }
483
484 /**
485 * Gets the given worker its worker node key.
486 *
487 * @param worker - The worker.
488 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
489 */
490 private getWorkerNodeKey (worker: Worker): number {
491 return this.workerNodes.findIndex(
492 workerNode => workerNode.worker === worker
493 )
494 }
495
496 /** @inheritDoc */
497 public setWorkerChoiceStrategy (
498 workerChoiceStrategy: WorkerChoiceStrategy,
499 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
500 ): void {
501 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
502 this.opts.workerChoiceStrategy = workerChoiceStrategy
503 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
504 this.opts.workerChoiceStrategy
505 )
506 if (workerChoiceStrategyOptions != null) {
507 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
508 }
509 for (const workerNode of this.workerNodes) {
510 workerNode.resetUsage()
511 this.setWorkerStatistics(workerNode.worker)
512 }
513 }
514
515 /** @inheritDoc */
516 public setWorkerChoiceStrategyOptions (
517 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
518 ): void {
519 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
520 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
521 this.workerChoiceStrategyContext.setOptions(
522 this.opts.workerChoiceStrategyOptions
523 )
524 }
525
526 /** @inheritDoc */
527 public enableTasksQueue (
528 enable: boolean,
529 tasksQueueOptions?: TasksQueueOptions
530 ): void {
531 if (this.opts.enableTasksQueue === true && !enable) {
532 this.flushTasksQueues()
533 }
534 this.opts.enableTasksQueue = enable
535 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
536 }
537
538 /** @inheritDoc */
539 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
540 if (this.opts.enableTasksQueue === true) {
541 this.checkValidTasksQueueOptions(tasksQueueOptions)
542 this.opts.tasksQueueOptions =
543 this.buildTasksQueueOptions(tasksQueueOptions)
544 } else if (this.opts.tasksQueueOptions != null) {
545 delete this.opts.tasksQueueOptions
546 }
547 }
548
549 private buildTasksQueueOptions (
550 tasksQueueOptions: TasksQueueOptions
551 ): TasksQueueOptions {
552 return {
553 concurrency: tasksQueueOptions?.concurrency ?? 1
554 }
555 }
556
557 /**
558 * Whether the pool is full or not.
559 *
560 * The pool filling boolean status.
561 */
562 protected get full (): boolean {
563 return this.workerNodes.length >= this.maxSize
564 }
565
566 /**
567 * Whether the pool is busy or not.
568 *
569 * The pool busyness boolean status.
570 */
571 protected abstract get busy (): boolean
572
573 /**
574 * Whether worker nodes are executing at least one task.
575 *
576 * @returns Worker nodes busyness boolean status.
577 */
578 protected internalBusy (): boolean {
579 return (
580 this.workerNodes.findIndex(workerNode => {
581 return workerNode.usage.tasks.executing === 0
582 }) === -1
583 )
584 }
585
586 /** @inheritDoc */
587 public async execute (data?: Data, name?: string): Promise<Response> {
588 const timestamp = performance.now()
589 const workerNodeKey = this.chooseWorkerNode()
590 const submittedTask: Task<Data> = {
591 name,
592 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
593 data: data ?? ({} as Data),
594 timestamp,
595 workerId: this.getWorkerInfo(workerNodeKey).id as number,
596 id: randomUUID()
597 }
598 const res = new Promise<Response>((resolve, reject) => {
599 this.promiseResponseMap.set(submittedTask.id as string, {
600 resolve,
601 reject,
602 worker: this.workerNodes[workerNodeKey].worker
603 })
604 })
605 if (
606 this.opts.enableTasksQueue === true &&
607 (this.busy ||
608 this.workerNodes[workerNodeKey].usage.tasks.executing >=
609 ((this.opts.tasksQueueOptions as TasksQueueOptions)
610 .concurrency as number))
611 ) {
612 this.enqueueTask(workerNodeKey, submittedTask)
613 } else {
614 this.executeTask(workerNodeKey, submittedTask)
615 }
616 this.checkAndEmitEvents()
617 // eslint-disable-next-line @typescript-eslint/return-await
618 return res
619 }
620
621 /** @inheritDoc */
622 public async destroy (): Promise<void> {
623 await Promise.all(
624 this.workerNodes.map(async (workerNode, workerNodeKey) => {
625 this.flushTasksQueue(workerNodeKey)
626 // FIXME: wait for tasks to be finished
627 const workerExitPromise = new Promise<void>(resolve => {
628 workerNode.worker.on('exit', () => {
629 resolve()
630 })
631 })
632 await this.destroyWorker(workerNode.worker)
633 await workerExitPromise
634 })
635 )
636 }
637
638 /**
639 * Terminates the given worker.
640 *
641 * @param worker - A worker within `workerNodes`.
642 */
643 protected abstract destroyWorker (worker: Worker): void | Promise<void>
644
645 /**
646 * Setup hook to execute code before worker nodes are created in the abstract constructor.
647 * Can be overridden.
648 *
649 * @virtual
650 */
651 protected setupHook (): void {
652 // Intentionally empty
653 }
654
655 /**
656 * Should return whether the worker is the main worker or not.
657 */
658 protected abstract isMain (): boolean
659
660 /**
661 * Hook executed before the worker task execution.
662 * Can be overridden.
663 *
664 * @param workerNodeKey - The worker node key.
665 * @param task - The task to execute.
666 */
667 protected beforeTaskExecutionHook (
668 workerNodeKey: number,
669 task: Task<Data>
670 ): void {
671 const workerUsage = this.workerNodes[workerNodeKey].usage
672 ++workerUsage.tasks.executing
673 this.updateWaitTimeWorkerUsage(workerUsage, task)
674 }
675
676 /**
677 * Hook executed after the worker task execution.
678 * Can be overridden.
679 *
680 * @param worker - The worker.
681 * @param message - The received message.
682 */
683 protected afterTaskExecutionHook (
684 worker: Worker,
685 message: MessageValue<Response>
686 ): void {
687 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
688 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
689 this.updateRunTimeWorkerUsage(workerUsage, message)
690 this.updateEluWorkerUsage(workerUsage, message)
691 }
692
693 private updateTaskStatisticsWorkerUsage (
694 workerUsage: WorkerUsage,
695 message: MessageValue<Response>
696 ): void {
697 const workerTaskStatistics = workerUsage.tasks
698 --workerTaskStatistics.executing
699 if (message.taskError == null) {
700 ++workerTaskStatistics.executed
701 } else {
702 ++workerTaskStatistics.failed
703 }
704 }
705
706 private updateRunTimeWorkerUsage (
707 workerUsage: WorkerUsage,
708 message: MessageValue<Response>
709 ): void {
710 if (
711 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
712 .aggregate
713 ) {
714 const taskRunTime = message.taskPerformance?.runTime ?? 0
715 workerUsage.runTime.aggregate =
716 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
717 workerUsage.runTime.minimum = Math.min(
718 taskRunTime,
719 workerUsage.runTime?.minimum ?? Infinity
720 )
721 workerUsage.runTime.maximum = Math.max(
722 taskRunTime,
723 workerUsage.runTime?.maximum ?? -Infinity
724 )
725 if (
726 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
727 .average &&
728 workerUsage.tasks.executed !== 0
729 ) {
730 workerUsage.runTime.average =
731 workerUsage.runTime.aggregate / workerUsage.tasks.executed
732 }
733 if (
734 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
735 .median &&
736 message.taskPerformance?.runTime != null
737 ) {
738 workerUsage.runTime.history.push(message.taskPerformance.runTime)
739 workerUsage.runTime.median = median(workerUsage.runTime.history)
740 }
741 }
742 }
743
744 private updateWaitTimeWorkerUsage (
745 workerUsage: WorkerUsage,
746 task: Task<Data>
747 ): void {
748 const timestamp = performance.now()
749 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
750 if (
751 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
752 .aggregate
753 ) {
754 workerUsage.waitTime.aggregate =
755 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
756 workerUsage.waitTime.minimum = Math.min(
757 taskWaitTime,
758 workerUsage.waitTime?.minimum ?? Infinity
759 )
760 workerUsage.waitTime.maximum = Math.max(
761 taskWaitTime,
762 workerUsage.waitTime?.maximum ?? -Infinity
763 )
764 if (
765 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
766 .waitTime.average &&
767 workerUsage.tasks.executed !== 0
768 ) {
769 workerUsage.waitTime.average =
770 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
771 }
772 if (
773 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
774 .waitTime.median
775 ) {
776 workerUsage.waitTime.history.push(taskWaitTime)
777 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
778 }
779 }
780 }
781
782 private updateEluWorkerUsage (
783 workerUsage: WorkerUsage,
784 message: MessageValue<Response>
785 ): void {
786 if (
787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
788 .aggregate
789 ) {
790 if (message.taskPerformance?.elu != null) {
791 workerUsage.elu.idle.aggregate =
792 (workerUsage.elu.idle?.aggregate ?? 0) +
793 message.taskPerformance.elu.idle
794 workerUsage.elu.active.aggregate =
795 (workerUsage.elu.active?.aggregate ?? 0) +
796 message.taskPerformance.elu.active
797 if (workerUsage.elu.utilization != null) {
798 workerUsage.elu.utilization =
799 (workerUsage.elu.utilization +
800 message.taskPerformance.elu.utilization) /
801 2
802 } else {
803 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
804 }
805 workerUsage.elu.idle.minimum = Math.min(
806 message.taskPerformance.elu.idle,
807 workerUsage.elu.idle?.minimum ?? Infinity
808 )
809 workerUsage.elu.idle.maximum = Math.max(
810 message.taskPerformance.elu.idle,
811 workerUsage.elu.idle?.maximum ?? -Infinity
812 )
813 workerUsage.elu.active.minimum = Math.min(
814 message.taskPerformance.elu.active,
815 workerUsage.elu.active?.minimum ?? Infinity
816 )
817 workerUsage.elu.active.maximum = Math.max(
818 message.taskPerformance.elu.active,
819 workerUsage.elu.active?.maximum ?? -Infinity
820 )
821 if (
822 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
823 .average &&
824 workerUsage.tasks.executed !== 0
825 ) {
826 workerUsage.elu.idle.average =
827 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
828 workerUsage.elu.active.average =
829 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
830 }
831 if (
832 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
833 .median
834 ) {
835 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
836 workerUsage.elu.active.history.push(
837 message.taskPerformance.elu.active
838 )
839 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
840 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
841 }
842 }
843 }
844 }
845
846 /**
847 * Chooses a worker node for the next task.
848 *
849 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
850 *
851 * @returns The worker node key
852 */
853 private chooseWorkerNode (): number {
854 if (this.shallCreateDynamicWorker()) {
855 const worker = this.createAndSetupDynamicWorker()
856 if (
857 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
858 ) {
859 return this.getWorkerNodeKey(worker)
860 }
861 }
862 return this.workerChoiceStrategyContext.execute()
863 }
864
865 /**
866 * Conditions for dynamic worker creation.
867 *
868 * @returns Whether to create a dynamic worker or not.
869 */
870 private shallCreateDynamicWorker (): boolean {
871 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
872 }
873
874 /**
875 * Sends a message to the given worker.
876 *
877 * @param worker - The worker which should receive the message.
878 * @param message - The message.
879 */
880 protected abstract sendToWorker (
881 worker: Worker,
882 message: MessageValue<Data>
883 ): void
884
885 /**
886 * Registers a listener callback on the given worker.
887 *
888 * @param worker - The worker which should register a listener.
889 * @param listener - The message listener callback.
890 */
891 private registerWorkerMessageListener<Message extends Data | Response>(
892 worker: Worker,
893 listener: (message: MessageValue<Message>) => void
894 ): void {
895 worker.on('message', listener as MessageHandler<Worker>)
896 }
897
898 /**
899 * Creates a new worker.
900 *
901 * @returns Newly created worker.
902 */
903 protected abstract createWorker (): Worker
904
905 /**
906 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
907 * Can be overridden.
908 *
909 * @param worker - The newly created worker.
910 */
911 protected afterWorkerSetup (worker: Worker): void {
912 // Listen to worker messages.
913 this.registerWorkerMessageListener(worker, this.workerListener())
914 // Send startup message to worker.
915 this.sendToWorker(worker, {
916 ready: false,
917 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
918 })
919 // Setup worker task statistics computation.
920 this.setWorkerStatistics(worker)
921 }
922
923 /**
924 * Creates a new worker and sets it up completely in the pool worker nodes.
925 *
926 * @returns New, completely set up worker.
927 */
928 protected createAndSetupWorker (): Worker {
929 const worker = this.createWorker()
930
931 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
932 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
933 worker.on('error', error => {
934 if (this.emitter != null) {
935 this.emitter.emit(PoolEvents.error, error)
936 }
937 if (this.opts.enableTasksQueue === true) {
938 this.redistributeQueuedTasks(worker)
939 }
940 if (this.opts.restartWorkerOnError === true && !this.starting) {
941 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
942 this.createAndSetupDynamicWorker()
943 } else {
944 this.createAndSetupWorker()
945 }
946 }
947 })
948 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
949 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
950 worker.once('exit', () => {
951 this.removeWorkerNode(worker)
952 })
953
954 this.pushWorkerNode(worker)
955
956 this.afterWorkerSetup(worker)
957
958 return worker
959 }
960
961 private redistributeQueuedTasks (worker: Worker): void {
962 const workerNodeKey = this.getWorkerNodeKey(worker)
963 while (this.tasksQueueSize(workerNodeKey) > 0) {
964 let targetWorkerNodeKey: number = workerNodeKey
965 let minQueuedTasks = Infinity
966 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
967 if (
968 workerNodeId !== workerNodeKey &&
969 workerNode.usage.tasks.queued === 0
970 ) {
971 targetWorkerNodeKey = workerNodeId
972 break
973 }
974 if (
975 workerNodeId !== workerNodeKey &&
976 workerNode.usage.tasks.queued < minQueuedTasks
977 ) {
978 minQueuedTasks = workerNode.usage.tasks.queued
979 targetWorkerNodeKey = workerNodeId
980 }
981 }
982 this.enqueueTask(
983 targetWorkerNodeKey,
984 this.dequeueTask(workerNodeKey) as Task<Data>
985 )
986 }
987 }
988
989 /**
990 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
991 *
992 * @returns New, completely set up dynamic worker.
993 */
994 protected createAndSetupDynamicWorker (): Worker {
995 const worker = this.createAndSetupWorker()
996 this.registerWorkerMessageListener(worker, message => {
997 const workerNodeKey = this.getWorkerNodeKey(worker)
998 if (
999 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1000 (message.kill != null &&
1001 ((this.opts.enableTasksQueue === false &&
1002 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
1003 (this.opts.enableTasksQueue === true &&
1004 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
1005 this.tasksQueueSize(workerNodeKey) === 0)))
1006 ) {
1007 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
1008 void (this.destroyWorker(worker) as Promise<void>)
1009 }
1010 })
1011 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1012 workerInfo.dynamic = true
1013 this.sendToWorker(worker, {
1014 checkAlive: true,
1015 workerId: workerInfo.id as number
1016 })
1017 return worker
1018 }
1019
1020 /**
1021 * This function is the listener registered for each worker message.
1022 *
1023 * @returns The listener function to execute when a message is received from a worker.
1024 */
1025 protected workerListener (): (message: MessageValue<Response>) => void {
1026 return message => {
1027 this.checkMessageWorkerId(message)
1028 if (message.ready != null && message.workerId != null) {
1029 // Worker ready message received
1030 this.handleWorkerReadyMessage(message)
1031 } else if (message.id != null) {
1032 // Task execution response received
1033 this.handleTaskExecutionResponse(message)
1034 }
1035 }
1036 }
1037
1038 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1039 const worker = this.getWorkerById(message.workerId)
1040 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1041 message.ready as boolean
1042 if (this.emitter != null && this.ready) {
1043 this.emitter.emit(PoolEvents.ready, this.info)
1044 }
1045 }
1046
1047 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1048 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1049 if (promiseResponse != null) {
1050 if (message.taskError != null) {
1051 if (this.emitter != null) {
1052 this.emitter.emit(PoolEvents.taskError, message.taskError)
1053 }
1054 promiseResponse.reject(message.taskError.message)
1055 } else {
1056 promiseResponse.resolve(message.data as Response)
1057 }
1058 this.afterTaskExecutionHook(promiseResponse.worker, message)
1059 this.promiseResponseMap.delete(message.id as string)
1060 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1061 if (
1062 this.opts.enableTasksQueue === true &&
1063 this.tasksQueueSize(workerNodeKey) > 0
1064 ) {
1065 this.executeTask(
1066 workerNodeKey,
1067 this.dequeueTask(workerNodeKey) as Task<Data>
1068 )
1069 }
1070 this.workerChoiceStrategyContext.update(workerNodeKey)
1071 }
1072 }
1073
1074 private checkAndEmitEvents (): void {
1075 if (this.emitter != null) {
1076 if (this.busy) {
1077 this.emitter.emit(PoolEvents.busy, this.info)
1078 }
1079 if (this.type === PoolTypes.dynamic && this.full) {
1080 this.emitter.emit(PoolEvents.full, this.info)
1081 }
1082 }
1083 }
1084
1085 /**
1086 * Gets the worker information.
1087 *
1088 * @param workerNodeKey - The worker node key.
1089 */
1090 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1091 return this.workerNodes[workerNodeKey].info
1092 }
1093
1094 /**
1095 * Pushes the given worker in the pool worker nodes.
1096 *
1097 * @param worker - The worker.
1098 * @returns The worker nodes length.
1099 */
1100 private pushWorkerNode (worker: Worker): number {
1101 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1102 }
1103
1104 /**
1105 * Removes the given worker from the pool worker nodes.
1106 *
1107 * @param worker - The worker.
1108 */
1109 private removeWorkerNode (worker: Worker): void {
1110 const workerNodeKey = this.getWorkerNodeKey(worker)
1111 if (workerNodeKey !== -1) {
1112 this.workerNodes.splice(workerNodeKey, 1)
1113 this.workerChoiceStrategyContext.remove(workerNodeKey)
1114 }
1115 }
1116
1117 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1118 this.beforeTaskExecutionHook(workerNodeKey, task)
1119 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1120 }
1121
1122 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1123 return this.workerNodes[workerNodeKey].enqueueTask(task)
1124 }
1125
1126 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1127 return this.workerNodes[workerNodeKey].dequeueTask()
1128 }
1129
1130 private tasksQueueSize (workerNodeKey: number): number {
1131 return this.workerNodes[workerNodeKey].tasksQueueSize()
1132 }
1133
1134 private flushTasksQueue (workerNodeKey: number): void {
1135 while (this.tasksQueueSize(workerNodeKey) > 0) {
1136 this.executeTask(
1137 workerNodeKey,
1138 this.dequeueTask(workerNodeKey) as Task<Data>
1139 )
1140 }
1141 this.workerNodes[workerNodeKey].clearTasksQueue()
1142 }
1143
1144 private flushTasksQueues (): void {
1145 for (const [workerNodeKey] of this.workerNodes.entries()) {
1146 this.flushTasksQueue(workerNodeKey)
1147 }
1148 }
1149
1150 private setWorkerStatistics (worker: Worker): void {
1151 this.sendToWorker(worker, {
1152 statistics: {
1153 runTime:
1154 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1155 .runTime.aggregate,
1156 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1157 .elu.aggregate
1158 },
1159 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1160 })
1161 }
1162 }