+ /** @inheritDoc */
+ public abstract get type (): PoolType
+
+ /**
+ * Number of tasks concurrently running in the pool.
+ */
+ private get numberOfRunningTasks (): number {
+ return this.promiseResponseMap.size
+ }
+
+ /**
+ * Gets the given worker its worker node key.
+ *
+ * @param worker - The worker.
+ * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
+ */
+ private getWorkerNodeKey (worker: Worker): number {
+ return this.workerNodes.findIndex(
+ workerNode => workerNode.worker === worker
+ )
+ }
+
+ /** @inheritDoc */
+ public setWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): void {
+ this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
+ this.opts.workerChoiceStrategy = workerChoiceStrategy
+ for (const [index, workerNode] of this.workerNodes.entries()) {
+ this.setWorkerNode(
+ index,
+ workerNode.worker,
+ {
+ run: 0,
+ running: 0,
+ runTime: 0,
+ runTimeHistory: new CircularArray(),
+ avgRunTime: 0,
+ medRunTime: 0,
+ error: 0
+ },
+ workerNode.tasksQueue
+ )
+ }
+ this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ workerChoiceStrategy
+ )
+ }
+
+ /** @inheritDoc */
+ public abstract get full (): boolean
+
+ /** @inheritDoc */
+ public abstract get busy (): boolean
+
+ protected internalBusy (): boolean {
+ return (
+ this.numberOfRunningTasks >= this.numberOfWorkers &&
+ this.findFreeWorkerNodeKey() === -1
+ )
+ }
+
+ /** @inheritDoc */
+ public findFreeWorkerNodeKey (): number {
+ return this.workerNodes.findIndex(workerNode => {
+ return workerNode.tasksUsage?.running === 0
+ })
+ }
+
+ /** @inheritDoc */
+ public async execute (data: Data): Promise<Response> {
+ const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+ const submittedTask: Task<Data> = {
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+ data: data ?? ({} as Data),
+ id: crypto.randomUUID()
+ }
+ const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
+ let currentTask: Task<Data>
+ // FIXME: Add sensible conditions to start tasks queuing on the worker node.
+ if (this.tasksQueueLength(workerNodeKey) > 0) {
+ currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
+ this.enqueueTask(workerNodeKey, submittedTask)
+ } else {
+ currentTask = submittedTask
+ }
+ this.sendToWorker(workerNode.worker, currentTask)
+ this.checkAndEmitFull()
+ this.checkAndEmitBusy()
+ // eslint-disable-next-line @typescript-eslint/return-await
+ return res
+ }
+
+ /** @inheritDoc */
+ public async destroy (): Promise<void> {
+ await Promise.all(
+ this.workerNodes.map(async workerNode => {
+ await this.destroyWorker(workerNode.worker)
+ })
+ )