refactor: cleanup pool.execute() structure
[poolifier.git] / src / pools / abstract-pool.ts
index ddf4cbccd64f0f29e70b7f2994b4c2cc543626f3..77991d7b95b76c67816d01faab881c1711f6c788 100644 (file)
@@ -64,7 +64,7 @@ export abstract class AbstractPool<
   public readonly emitter?: PoolEmitter
 
   /**
-   * The execution response promise map.
+   * The task execution response promise map.
    *
    * - `key`: The message id of each submitted task.
    * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
@@ -516,7 +516,7 @@ export abstract class AbstractPool<
    * @param worker - The worker.
    * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
    */
-  private getWorkerNodeKey (worker: Worker): number {
+  protected getWorkerNodeKey (worker: Worker): number {
     return this.workerNodes.findIndex(
       workerNode => workerNode.worker === worker
     )
@@ -614,37 +614,35 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async execute (data?: Data, name?: string): Promise<Response> {
-    const timestamp = performance.now()
-    const workerNodeKey = this.chooseWorkerNode()
-    const submittedTask: Task<Data> = {
-      name: name ?? DEFAULT_TASK_NAME,
-      // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
-      data: data ?? ({} as Data),
-      timestamp,
-      workerId: this.getWorkerInfo(workerNodeKey).id as number,
-      id: randomUUID()
-    }
-    const res = new Promise<Response>((resolve, reject) => {
+    return await new Promise<Response>((resolve, reject) => {
+      const timestamp = performance.now()
+      const workerNodeKey = this.chooseWorkerNode()
+      const submittedTask: Task<Data> = {
+        name: name ?? DEFAULT_TASK_NAME,
+        // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+        data: data ?? ({} as Data),
+        timestamp,
+        workerId: this.getWorkerInfo(workerNodeKey).id as number,
+        id: randomUUID()
+      }
       this.promiseResponseMap.set(submittedTask.id as string, {
         resolve,
         reject,
         worker: this.workerNodes[workerNodeKey].worker
       })
+      if (
+        this.opts.enableTasksQueue === true &&
+        (this.busy ||
+          this.workerNodes[workerNodeKey].usage.tasks.executing >=
+            ((this.opts.tasksQueueOptions as TasksQueueOptions)
+              .concurrency as number))
+      ) {
+        this.enqueueTask(workerNodeKey, submittedTask)
+      } else {
+        this.executeTask(workerNodeKey, submittedTask)
+      }
+      this.checkAndEmitEvents()
     })
-    if (
-      this.opts.enableTasksQueue === true &&
-      (this.busy ||
-        this.workerNodes[workerNodeKey].usage.tasks.executing >=
-          ((this.opts.tasksQueueOptions as TasksQueueOptions)
-            .concurrency as number))
-    ) {
-      this.enqueueTask(workerNodeKey, submittedTask)
-    } else {
-      this.executeTask(workerNodeKey, submittedTask)
-    }
-    this.checkAndEmitEvents()
-    // eslint-disable-next-line @typescript-eslint/return-await
-    return res
   }
 
   /** @inheritDoc */
@@ -862,6 +860,7 @@ export abstract class AbstractPool<
       const workerNodeKey = this.getWorkerNodeKey(worker)
       const workerInfo = this.getWorkerInfo(workerNodeKey)
       workerInfo.ready = false
+      this.workerNodes[workerNodeKey].closeChannel()
       this.emitter?.emit(PoolEvents.error, error)
       if (this.opts.restartWorkerOnError === true && !this.starting) {
         if (workerInfo.dynamic) {
@@ -877,11 +876,6 @@ export abstract class AbstractPool<
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
-      const workerInfo = this.getWorkerInfoByWorker(worker)
-      if (workerInfo.messageChannel != null) {
-        workerInfo.messageChannel?.port1.close()
-        workerInfo.messageChannel?.port1.close()
-      }
       this.removeWorkerNode(worker)
     })
 
@@ -1055,6 +1049,7 @@ export abstract class AbstractPool<
    * Gets the worker information from the given worker node key.
    *
    * @param workerNodeKey - The worker node key.
+   * @returns The worker information.
    */
   private getWorkerInfo (workerNodeKey: number): WorkerInfo {
     return this.workerNodes[workerNodeKey].info
@@ -1064,9 +1059,15 @@ export abstract class AbstractPool<
    * Gets the worker information from the given worker.
    *
    * @param worker - The worker.
+   * @returns The worker information.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
    */
   protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
-    return this.workerNodes[this.getWorkerNodeKey(worker)].info
+    const workerNodeKey = this.getWorkerNodeKey(worker)
+    if (workerNodeKey === -1) {
+      throw new Error('Worker not found')
+    }
+    return this.workerNodes[workerNodeKey].info
   }
 
   /**