+ /** @inheritDoc */
+ public get info (): PoolInfo {
+ return {
+ version,
+ type: this.type,
+ worker: this.worker,
+ ready: this.ready,
+ strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
+ minSize: this.minSize,
+ maxSize: this.maxSize,
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && { utilization: round(this.utilization) }),
+ workerNodes: this.workerNodes.length,
+ idleWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.usage.tasks.executing === 0
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ),
+ busyWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
+ 0
+ ),
+ executedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.executed,
+ 0
+ ),
+ executingTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.executing,
+ 0
+ ),
+ ...(this.opts.enableTasksQueue === true && {
+ queuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.queued,
+ 0
+ )
+ }),
+ ...(this.opts.enableTasksQueue === true && {
+ maxQueuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+ 0
+ )
+ }),
+ failedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.tasks.failed,
+ 0
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate && {
+ runTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.runTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ }),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && {
+ waitTime: {
+ minimum: round(
+ Math.min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
+ )
+ )
+ ),
+ maximum: round(
+ Math.max(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
+ )
+ )
+ ),
+ average: round(
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ 0
+ ) /
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.tasks?.executed ?? 0),
+ 0
+ )
+ ),
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.median && {
+ median: round(
+ median(
+ this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime?.median ?? 0
+ )
+ )
+ )
+ })
+ }
+ })
+ }
+ }
+
+ /**
+ * The pool readiness boolean status.
+ */
+ private get ready (): boolean {
+ return (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic && workerNode.info.ready
+ ? accumulator + 1
+ : accumulator,
+ 0
+ ) >= this.minSize
+ )
+ }
+
+ /**
+ * The approximate pool utilization.
+ *
+ * @returns The pool utilization.
+ */
+ private get utilization (): number {
+ const poolTimeCapacity =
+ (performance.now() - this.startTimestamp) * this.maxSize
+ const totalTasksRunTime = this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
+ 0
+ )
+ const totalTasksWaitTime = this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
+ 0
+ )
+ return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
+ }
+
+ /**
+ * The pool type.
+ *
+ * If it is `'dynamic'`, it provides the `max` property.
+ */
+ protected abstract get type (): PoolType
+
+ /**
+ * The worker type.
+ */
+ protected abstract get worker (): WorkerType
+
+ /**
+ * The pool minimum size.
+ */
+ protected abstract get minSize (): number
+
+ /**
+ * The pool maximum size.
+ */
+ protected abstract get maxSize (): number
+
+ /**
+ * Checks if the worker id sent in the received message from a worker is valid.
+ *
+ * @param message - The received message.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
+ */
+ private checkMessageWorkerId (message: MessageValue<Response>): void {
+ if (
+ message.workerId != null &&
+ this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
+ ) {
+ throw new Error(
+ `Worker message received from unknown worker '${message.workerId}'`
+ )
+ }
+ }
+
+ /**
+ * Gets the given worker its worker node key.
+ *
+ * @param worker - The worker.
+ * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
+ */
+ private getWorkerNodeKeyByWorker (worker: Worker): number {
+ return this.workerNodes.findIndex(
+ workerNode => workerNode.worker === worker
+ )
+ }
+
+ /**
+ * Gets the worker node key given its worker id.
+ *
+ * @param workerId - The worker id.
+ * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
+ */
+ private getWorkerNodeKeyByWorkerId (workerId: number): number {
+ return this.workerNodes.findIndex(
+ workerNode => workerNode.info.id === workerId
+ )