Merge branch 'master' into worker-info
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 1 Jul 2023 22:05:41 +0000 (00:05 +0200)
committerGitHub <noreply@github.com>
Sat, 1 Jul 2023 22:05:41 +0000 (00:05 +0200)
src/pools/abstract-pool.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/pools/abstract/abstract-pool.test.js

index 0f18de3fba17f63f0f6b0b8c76697852cc085baa..f931a46670771b6564405000ed6f96943cc29d5f 100644 (file)
@@ -20,7 +20,8 @@ import {
   type PoolType,
   PoolTypes,
   type TasksQueueOptions,
-  type WorkerType
+  type WorkerType,
+  WorkerTypes
 } from './pool'
 import type {
   IWorker,
@@ -244,6 +245,14 @@ export abstract class AbstractPool<
     }
   }
 
+  private get starting (): boolean {
+    return this.workerNodes.some(workerNode => !workerNode.info.started)
+  }
+
+  private get started (): boolean {
+    return this.workerNodes.some(workerNode => workerNode.info.started)
+  }
+
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
@@ -344,6 +353,17 @@ export abstract class AbstractPool<
    */
   protected abstract get maxSize (): number
 
+  /**
+   * Get the worker given its id.
+   *
+   * @param workerId - The worker id.
+   * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
+   */
+  private getWorkerById (workerId: number): Worker | undefined {
+    return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
+      ?.worker
+  }
+
   /**
    * Gets the given worker its worker node key.
    *
@@ -750,7 +770,7 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
-      if (this.opts.restartWorkerOnError === true) {
+      if (this.opts.restartWorkerOnError === true && !this.starting) {
         this.createAndSetupWorker()
       }
     })
@@ -801,7 +821,16 @@ export abstract class AbstractPool<
    */
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
-      if (message.id != null) {
+      if (message.workerId != null && message.started != null) {
+        // Worker started message received
+        const worker = this.getWorkerById(message.workerId)
+        if (worker != null) {
+          this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
+            message.started
+        } else {
+          throw new Error('Worker started message received from unknown worker')
+        }
+      } else if (message.id != null) {
         // Task execution response received
         const promiseResponse = this.promiseResponseMap.get(message.id)
         if (promiseResponse != null) {
@@ -864,6 +893,7 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     this.workerNodes.push({
       worker,
+      info: { id: this.getWorkerId(worker), started: false },
       usage: this.getWorkerUsage(),
       tasksQueue: new Queue<Task<Data>>()
     })
@@ -875,22 +905,39 @@ export abstract class AbstractPool<
     return this.workerNodes.length
   }
 
+  /**
+   * Gets the worker id.
+   *
+   * @param worker - The worker.
+   * @returns The worker id.
+   */
+  private getWorkerId (worker: Worker): number | undefined {
+    if (this.worker === WorkerTypes.thread) {
+      return worker.threadId
+    } else if (this.worker === WorkerTypes.cluster) {
+      return worker.id
+    }
+  }
+
   // /**
   //  * Sets the given worker in the pool worker nodes.
   //  *
   //  * @param workerNodeKey - The worker node key.
   //  * @param worker - The worker.
+  //  * @param workerInfo - The worker info.
   //  * @param workerUsage - The worker usage.
   //  * @param tasksQueue - The worker task queue.
   //  */
   // private setWorkerNode (
   //   workerNodeKey: number,
   //   worker: Worker,
+  //   workerInfo: WorkerInfo,
   //   workerUsage: WorkerUsage,
   //   tasksQueue: Queue<Task<Data>>
   // ): void {
   //   this.workerNodes[workerNodeKey] = {
   //     worker,
+  //     info: workerInfo,
   //     usage: workerUsage,
   //     tasksQueue
   //   }
index dc283e0eaf9298a683dcfffba8dbaabf4afb6cc2..664b6a7695ef42a088732812a7eac270e9f87a00 100644 (file)
@@ -118,6 +118,22 @@ export interface TaskStatistics {
   failed: number
 }
 
+/**
+ * Worker information.
+ *
+ * @internal
+ */
+export interface WorkerInfo {
+  /**
+   * Worker id.
+   */
+  id: number | undefined
+  /**
+   * Started flag.
+   */
+  started: boolean
+}
+
 /**
  * Worker usage statistics.
  *
@@ -146,6 +162,11 @@ export interface WorkerUsage {
  * Worker interface.
  */
 export interface IWorker {
+  /**
+   * Worker id.
+   */
+  readonly id?: number
+  readonly threadId?: number
   /**
    * Registers an event listener.
    *
@@ -177,6 +198,10 @@ export interface WorkerNode<Worker extends IWorker, Data = unknown> {
    * Worker node worker.
    */
   readonly worker: Worker
+  /**
+   * Worker node worker info.
+   */
+  info: WorkerInfo
   /**
    * Worker node worker usage statistics.
    */
index 1eb8df50ed527bf80eeff0eebee1721fe7e7f0c4..0443b4ceef2374e28ba8201bf5d1d6a61ebbce93 100644 (file)
@@ -53,6 +53,10 @@ export interface WorkerStatistics {
  */
 export interface MessageValue<Data = unknown, ErrorData = unknown>
   extends Task<Data> {
+  /**
+   * Worker id.
+   */
+  readonly workerId?: number
   /**
    * Kill code.
    */
@@ -69,6 +73,10 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
    * Whether the worker computes the given statistics or not.
    */
   readonly statistics?: WorkerStatistics
+  /**
+   * Whether the worker has started or not.
+   */
+  readonly started?: boolean
 }
 
 /**
index 2a027269f990375f2a1556372a463e5b9f726655..6e17c9ed92f53a3612d214f782d38019875ed46c 100644 (file)
@@ -36,6 +36,10 @@ export abstract class AbstractWorker<
   Data = unknown,
   Response = unknown
 > extends AsyncResource {
+  /**
+   * Worker id.
+   */
+  protected abstract id: number
   /**
    * Task function(s) processed by the worker when the pool's `execution` function is invoked.
    */
@@ -225,6 +229,7 @@ export abstract class AbstractWorker<
       this.sendToMainWorker({
         data: res,
         taskPerformance,
+        workerId: this.id,
         id: message.id
       })
     } catch (e) {
@@ -234,6 +239,7 @@ export abstract class AbstractWorker<
           message: err,
           data: message.data
         },
+        workerId: this.id,
         id: message.id
       })
     } finally {
@@ -258,6 +264,7 @@ export abstract class AbstractWorker<
         this.sendToMainWorker({
           data: res,
           taskPerformance,
+          workerId: this.id,
           id: message.id
         })
         return null
@@ -269,6 +276,7 @@ export abstract class AbstractWorker<
             message: err,
             data: message.data
           },
+          workerId: this.id,
           id: message.id
         })
       })
index 13735b1d697c076c547981e9d5aad8bd22633f84..16dddd2969bd50193c4fb43e762d607f08f4ca08 100644 (file)
@@ -41,10 +41,19 @@ export class ClusterWorker<
       cluster.worker as Worker,
       opts
     )
+    if (!this.isMain) {
+      this.sendToMainWorker({ workerId: this.id, started: true })
+    }
+  }
+
+  /** @inheritDoc */
+  protected get id (): number {
+    return this.getMainWorker().id
   }
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
+    console.log('sending message to main worker(cluster)', message)
     this.getMainWorker().send(message)
   }
 
index b6573a974cc653656fb6a968b0437e57eeed7dd6..7a766a958ccbb41c9b29bbe76c04890eec61604d 100644 (file)
@@ -1,4 +1,9 @@
-import { type MessagePort, isMainThread, parentPort } from 'node:worker_threads'
+import {
+  type MessagePort,
+  isMainThread,
+  parentPort,
+  threadId
+} from 'node:worker_threads'
 import type { MessageValue } from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
@@ -41,10 +46,18 @@ export class ThreadWorker<
       parentPort as MessagePort,
       opts
     )
+    if (!this.isMain) {
+      this.sendToMainWorker({ workerId: this.id, started: true })
+    }
+  }
+
+  protected get id (): number {
+    return threadId
   }
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
+    console.log('sending message to main worker(thread)', message)
     this.getMainWorker().postMessage(message)
   }
 }
index 023b56b414c4c3453d269c5f5605d0e69a8c4275..7c42d34a99f09efd9b3314ecff6ce14ac016783f 100644 (file)
@@ -407,6 +407,9 @@ describe('Abstract pool test suite', () => {
       maxQueuedTasks: 0,
       failedTasks: 0
     })
+    for (const workerNode of pool.workerNodes) {
+      console.log('thread:workerNode.info', workerNode.info)
+    }
     await pool.destroy()
     pool = new DynamicClusterPool(
       numberOfWorkers,
@@ -428,6 +431,9 @@ describe('Abstract pool test suite', () => {
       maxQueuedTasks: 0,
       failedTasks: 0
     })
+    for (const workerNode of pool.workerNodes) {
+      console.log('cluster:workerNode.info', workerNode.info)
+    }
     await pool.destroy()
   })