+ /**
+ * Number of tasks queued in the pool.
+ */
+ private get numberOfQueuedTasks (): number {
+ if (this.opts.enableTasksQueue === false) {
+ return 0
+ }
+ return this.workerNodes.reduce(
+ (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length,
+ 0
+ )
+ }
+
+ /**
+ * Gets the given worker its worker node key.
+ *
+ * @param worker - The worker.
+ * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
+ */
+ private getWorkerNodeKey (worker: Worker): number {
+ return this.workerNodes.findIndex(
+ workerNode => workerNode.worker === worker
+ )
+ }
+
+ /** @inheritDoc */
+ public setWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): void {
+ this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
+ this.opts.workerChoiceStrategy = workerChoiceStrategy
+ for (const workerNode of this.workerNodes) {
+ this.setWorkerNodeTasksUsage(workerNode, {
+ run: 0,
+ running: 0,
+ runTime: 0,
+ runTimeHistory: new CircularArray(),
+ avgRunTime: 0,
+ medRunTime: 0,
+ error: 0
+ })
+ }
+ this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ workerChoiceStrategy
+ )
+ }
+
+ /** @inheritDoc */
+ public abstract get full (): boolean
+
+ /** @inheritDoc */
+ public abstract get busy (): boolean
+
+ protected internalBusy (): boolean {
+ return (
+ this.numberOfRunningTasks >= this.numberOfWorkers &&
+ this.findFreeWorkerNodeKey() === -1
+ )
+ }
+
+ /** @inheritDoc */
+ public findFreeWorkerNodeKey (): number {
+ return this.workerNodes.findIndex(workerNode => {
+ return workerNode.tasksUsage?.running === 0
+ })
+ }
+
+ /** @inheritDoc */
+ public async execute (data: Data): Promise<Response> {
+ const [workerNodeKey, workerNode] = this.chooseWorkerNode()
+ const submittedTask: Task<Data> = {
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+ data: data ?? ({} as Data),
+ id: crypto.randomUUID()
+ }
+ const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
+ if (
+ this.opts.enableTasksQueue === true &&
+ (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
+ ) {
+ this.enqueueTask(workerNodeKey, submittedTask)
+ } else {
+ this.sendToWorker(workerNode.worker, submittedTask)
+ }
+ this.checkAndEmitEvents()
+ // eslint-disable-next-line @typescript-eslint/return-await
+ return res
+ }
+
+ /** @inheritDoc */
+ public async destroy (): Promise<void> {
+ await Promise.all(
+ this.workerNodes.map(async workerNode => {
+ this.flushTasksQueueByWorker(workerNode.worker)
+ await this.destroyWorker(workerNode.worker)
+ })
+ )
+ }
+
+ /**
+ * Shutdowns the given worker.
+ *
+ * @param worker - A worker within `workerNodes`.
+ */
+ protected abstract destroyWorker (worker: Worker): void | Promise<void>
+
+ /**
+ * Setup hook to run code before worker node are created in the abstract constructor.
+ * Can be overridden
+ *
+ * @virtual
+ */