+ private checkAndEmitEvents (): void {
+ if (this.emitter != null) {
+ if (this.busy) {
+ this.emitter?.emit(PoolEvents.busy, this.info)
+ }
+ if (this.type === PoolTypes.dynamic && this.full) {
+ this.emitter?.emit(PoolEvents.full, this.info)
+ }
+ }
+ }
+
+ /**
+ * Sets the given worker node its tasks usage in the pool.
+ *
+ * @param workerNode - The worker node.
+ * @param workerUsage - The worker usage.
+ */
+ private setWorkerNodeTasksUsage (
+ workerNode: WorkerNode<Worker, Data>,
+ workerUsage: WorkerUsage
+ ): void {
+ workerNode.workerUsage = workerUsage
+ }
+
+ /**
+ * Pushes the given worker in the pool worker nodes.
+ *
+ * @param worker - The worker.
+ * @returns The worker nodes length.
+ */
+ private pushWorkerNode (worker: Worker): number {
+ return this.workerNodes.push({
+ worker,
+ workerUsage: this.getWorkerUsage(worker),
+ tasksQueue: new Queue<Task<Data>>()
+ })
+ }
+
+ // /**
+ // * Sets the given worker in the pool worker nodes.
+ // *
+ // * @param workerNodeKey - The worker node key.
+ // * @param worker - The worker.
+ // * @param workerUsage - The worker usage.
+ // * @param tasksQueue - The worker task queue.
+ // */
+ // private setWorkerNode (
+ // workerNodeKey: number,
+ // worker: Worker,
+ // workerUsage: WorkerUsage,
+ // tasksQueue: Queue<Task<Data>>
+ // ): void {
+ // this.workerNodes[workerNodeKey] = {
+ // worker,
+ // workerUsage,
+ // tasksQueue
+ // }
+ // }
+
+ /**
+ * Removes the given worker from the pool worker nodes.
+ *
+ * @param worker - The worker.
+ */
+ private removeWorkerNode (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ if (workerNodeKey !== -1) {
+ this.workerNodes.splice(workerNodeKey, 1)
+ this.workerChoiceStrategyContext.remove(workerNodeKey)
+ }
+ }
+
+ private executeTask (workerNodeKey: number, task: Task<Data>): void {
+ this.beforeTaskExecutionHook(workerNodeKey, task)
+ this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
+ }
+
+ private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+ return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+ }
+
+ private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+ }
+
+ private tasksQueueSize (workerNodeKey: number): number {
+ return this.workerNodes[workerNodeKey].tasksQueue.size
+ }
+
+ private flushTasksQueue (workerNodeKey: number): void {
+ if (this.tasksQueueSize(workerNodeKey) > 0) {
+ for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+ }
+
+ private flushTasksQueues (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.flushTasksQueue(workerNodeKey)
+ }
+ }
+
+ private setWorkerStatistics (worker: Worker): void {
+ this.sendToWorker(worker, {
+ statistics: {
+ runTime:
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate,
+ elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .elu
+ }
+ })
+ }
+
+ private getWorkerUsage (worker: Worker): WorkerUsage {
+ return {
+ tasks: this.getTaskStatistics(worker),
+ runTime: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ waitTime: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ elu: undefined
+ }
+ }
+
+ private getTaskStatistics (worker: Worker): TaskStatistics {
+ const queueSize =
+ this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
+ return {
+ executed: 0,
+ executing: 0,
+ get queued (): number {
+ return queueSize ?? 0
+ },
+ failed: 0