feat: add worker info to worker nodes
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 30 Jun 2023 20:31:57 +0000 (22:31 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 30 Jun 2023 20:31:57 +0000 (22:31 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
14 files changed:
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/least-busy-worker-choice-strategy.ts
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
src/pools/selection-strategies/least-used-worker-choice-strategy.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/thread/fixed.test.js

index da0f2e77358525336a5936a42841edce9016f44b..6fde2491b6e1a565ba48ce26d821407eff67c8a2 100644 (file)
@@ -236,6 +236,14 @@ export abstract class AbstractPool<
     }
   }
 
+  private get starting (): boolean {
+    return this.workerNodes.some(workerNode => !workerNode.info.started)
+  }
+
+  private get started (): boolean {
+    return this.workerNodes.some(workerNode => workerNode.info.started)
+  }
+
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
@@ -246,41 +254,39 @@ export abstract class AbstractPool<
       workerNodes: this.workerNodes.length,
       idleWorkerNodes: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          workerNode.workerUsage.tasks.executing === 0
+          workerNode.usage.tasks.executing === 0
             ? accumulator + 1
             : accumulator,
         0
       ),
       busyWorkerNodes: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          workerNode.workerUsage.tasks.executing > 0
-            ? accumulator + 1
-            : accumulator,
+          workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
         0
       ),
       executedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          accumulator + workerNode.workerUsage.tasks.executed,
+          accumulator + workerNode.usage.tasks.executed,
         0
       ),
       executingTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          accumulator + workerNode.workerUsage.tasks.executing,
+          accumulator + workerNode.usage.tasks.executing,
         0
       ),
       queuedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          accumulator + workerNode.workerUsage.tasks.queued,
+          accumulator + workerNode.usage.tasks.queued,
         0
       ),
       maxQueuedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          accumulator + workerNode.workerUsage.tasks.maxQueued,
+          accumulator + workerNode.usage.tasks.maxQueued,
         0
       ),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          accumulator + workerNode.workerUsage.tasks.failed,
+          accumulator + workerNode.usage.tasks.failed,
         0
       )
     }
@@ -308,11 +314,22 @@ export abstract class AbstractPool<
    */
   protected abstract get maxSize (): number
 
+  /**
+   * Get the worker given its id.
+   *
+   * @param workerId - The worker id.
+   * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
+   */
+  private getWorkerById (workerId: number): Worker | undefined {
+    return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
+      ?.worker
+  }
+
   /**
    * Gets the given worker its worker node key.
    *
    * @param worker - The worker.
-   * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
+   * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
    */
   private getWorkerNodeKey (worker: Worker): number {
     return this.workerNodes.findIndex(
@@ -408,7 +425,7 @@ export abstract class AbstractPool<
   protected internalBusy (): boolean {
     return (
       this.workerNodes.findIndex(workerNode => {
-        return workerNode.workerUsage.tasks.executing === 0
+        return workerNode.usage.tasks.executing === 0
       }) === -1
     )
   }
@@ -434,7 +451,7 @@ export abstract class AbstractPool<
     if (
       this.opts.enableTasksQueue === true &&
       (this.busy ||
-        this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
+        this.workerNodes[workerNodeKey].usage.tasks.executing >=
           ((this.opts.tasksQueueOptions as TasksQueueOptions)
             .concurrency as number))
     ) {
@@ -491,7 +508,7 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     task: Task<Data>
   ): void {
-    const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+    const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
   }
@@ -507,8 +524,7 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerUsage =
-      this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+    const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
@@ -715,7 +731,7 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
-      if (this.opts.restartWorkerOnError === true) {
+      if (this.opts.restartWorkerOnError === true && !this.starting) {
         this.createAndSetupWorker()
       }
     })
@@ -747,11 +763,9 @@ export abstract class AbstractPool<
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
         (message.kill != null &&
           ((this.opts.enableTasksQueue === false &&
-            this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
-              0) ||
+            this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
             (this.opts.enableTasksQueue === true &&
-              this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
-                0 &&
+              this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
               this.tasksQueueSize(workerNodeKey) === 0)))
       ) {
         // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
@@ -768,7 +782,12 @@ export abstract class AbstractPool<
    */
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
-      if (message.id != null) {
+      if (message.workerId != null && message.started != null) {
+        // Worker started message received
+        this.workerNodes[
+          this.getWorkerNodeKey(this.getWorkerById(message.workerId) as Worker)
+        ].info.started = message.started
+      } else if (message.id != null) {
         // Task execution response received
         const promiseResponse = this.promiseResponseMap.get(message.id)
         if (promiseResponse != null) {
@@ -819,7 +838,7 @@ export abstract class AbstractPool<
     workerNode: WorkerNode<Worker, Data>,
     workerUsage: WorkerUsage
   ): void {
-    workerNode.workerUsage = workerUsage
+    workerNode.usage = workerUsage
   }
 
   /**
@@ -831,7 +850,8 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     this.workerNodes.push({
       worker,
-      workerUsage: this.getWorkerUsage(),
+      info: { id: worker.threadId ?? worker.id, started: false },
+      usage: this.getWorkerUsage(),
       tasksQueue: new Queue<Task<Data>>()
     })
     const workerNodeKey = this.getWorkerNodeKey(worker)
@@ -847,18 +867,21 @@ export abstract class AbstractPool<
   //  *
   //  * @param workerNodeKey - The worker node key.
   //  * @param worker - The worker.
+  //  * @param workerInfo - The worker info.
   //  * @param workerUsage - The worker usage.
   //  * @param tasksQueue - The worker task queue.
   //  */
   // private setWorkerNode (
   //   workerNodeKey: number,
   //   worker: Worker,
+  //   workerInfo: WorkerInfo,
   //   workerUsage: WorkerUsage,
   //   tasksQueue: Queue<Task<Data>>
   // ): void {
   //   this.workerNodes[workerNodeKey] = {
   //     worker,
-  //     workerUsage,
+  //     info: workerInfo,
+  //     usage: workerUsage,
   //     tasksQueue
   //   }
   // }
index 37759198c882aae1f47256a7f42e19e9261b92a8..631f286d766c5ad432d171bbb1ffda3e15d906ca 100644 (file)
@@ -152,8 +152,8 @@ export abstract class AbstractWorkerChoiceStrategy<
    */
   protected getWorkerTaskRunTime (workerNodeKey: number): number {
     return this.taskStatisticsRequirements.runTime.median
-      ? this.pool.workerNodes[workerNodeKey].workerUsage.runTime.median
-      : this.pool.workerNodes[workerNodeKey].workerUsage.runTime.average
+      ? this.pool.workerNodes[workerNodeKey].usage.runTime.median
+      : this.pool.workerNodes[workerNodeKey].usage.runTime.average
   }
 
   /**
@@ -166,8 +166,8 @@ export abstract class AbstractWorkerChoiceStrategy<
    */
   protected getWorkerTaskWaitTime (workerNodeKey: number): number {
     return this.taskStatisticsRequirements.waitTime.median
-      ? this.pool.workerNodes[workerNodeKey].workerUsage.waitTime.median
-      : this.pool.workerNodes[workerNodeKey].workerUsage.waitTime.average
+      ? this.pool.workerNodes[workerNodeKey].usage.waitTime.median
+      : this.pool.workerNodes[workerNodeKey].usage.waitTime.average
   }
 
   /**
@@ -180,8 +180,8 @@ export abstract class AbstractWorkerChoiceStrategy<
    */
   protected getWorkerTaskElu (workerNodeKey: number): number {
     return this.taskStatisticsRequirements.elu.median
-      ? this.pool.workerNodes[workerNodeKey].workerUsage.elu.active.median
-      : this.pool.workerNodes[workerNodeKey].workerUsage.elu.active.average
+      ? this.pool.workerNodes[workerNodeKey].usage.elu.active.median
+      : this.pool.workerNodes[workerNodeKey].usage.elu.active.average
   }
 
   protected computeDefaultWorkerWeight (): number {
@@ -206,7 +206,7 @@ export abstract class AbstractWorkerChoiceStrategy<
   //  */
   // private findFirstFreeWorkerNodeKey (): number {
   //   return this.pool.workerNodes.findIndex(workerNode => {
-  //     return workerNode.workerUsage.tasks.executing === 0
+  //     return workerNode.usage.tasks.executing === 0
   //   })
   // }
 
@@ -222,16 +222,14 @@ export abstract class AbstractWorkerChoiceStrategy<
   // private findLastFreeWorkerNodeKey (): number {
   //   // It requires node >= 18.0.0:
   //   // return this.workerNodes.findLastIndex(workerNode => {
-  //   //   return workerNode.workerUsage.tasks.executing === 0
+  //   //   return workerNode.usage.tasks.executing === 0
   //   // })
   //   for (
   //     let workerNodeKey = this.pool.workerNodes.length - 1;
   //     workerNodeKey >= 0;
   //     workerNodeKey--
   //   ) {
-  //     if (
-  //       this.pool.workerNodes[workerNodeKey].workerUsage.tasks.executing === 0
-  //     ) {
+  //     if (this.pool.workerNodes[workerNodeKey].usage.tasks.executing === 0) {
   //       return workerNodeKey
   //     }
   //   }
index 7e9d581421319919e865c809df85d06f3c6e305c..14aee44f97a090f5270fa23d41389e32f0c5ee8d 100644 (file)
@@ -64,8 +64,7 @@ export class LeastBusyWorkerChoiceStrategy<
     let minTime = Infinity
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
       const workerTime =
-        workerNode.workerUsage.runTime.aggregate +
-        workerNode.workerUsage.waitTime.aggregate
+        workerNode.usage.runTime.aggregate + workerNode.usage.waitTime.aggregate
       if (workerTime === 0) {
         this.nextWorkerNodeId = workerNodeKey
         break
index 9c29a30a965fa09ec78fd4f916206ceee39f4047..cbe00a4780792fdcf189d217b29ddcbe2787c853 100644 (file)
@@ -59,7 +59,7 @@ export class LeastEluWorkerChoiceStrategy<
   public choose (): number {
     let minWorkerElu = Infinity
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
-      const workerUsage = workerNode.workerUsage
+      const workerUsage = workerNode.usage
       const workerElu = workerUsage.elu?.active.aggregate ?? 0
       if (workerElu === 0) {
         this.nextWorkerNodeId = workerNodeKey
index 5c266f282d229fd5c5647e3305237ac1ee33c865..53aa05eed641d557631e56e964b7d57739a4a7f8 100644 (file)
@@ -44,7 +44,7 @@ export class LeastUsedWorkerChoiceStrategy<
   public choose (): number {
     let minNumberOfTasks = Infinity
     for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
-      const workerTaskStatistics = workerNode.workerUsage.tasks
+      const workerTaskStatistics = workerNode.usage.tasks
       const workerTasks =
         workerTaskStatistics.executed +
         workerTaskStatistics.executing +
index 07eabdfe4b43d28a7ecc1238b6f422be04d83ef1..c92478d4a8436512853384f65ccce026f6528a19 100644 (file)
@@ -118,6 +118,22 @@ export interface TaskStatistics {
   failed: number
 }
 
+/**
+ * Worker information.
+ *
+ * @internal
+ */
+export interface WorkerInfo {
+  /**
+   * Worker Id.
+   */
+  id: number | undefined
+  /**
+   * Started flag.
+   */
+  started: boolean
+}
+
 /**
  * Worker usage statistics.
  *
@@ -146,6 +162,11 @@ export interface WorkerUsage {
  * Worker interface.
  */
 export interface IWorker {
+  /**
+   * Worker Id.
+   */
+  id?: number
+  threadId?: number
   /**
    * Register an event listener.
    *
@@ -177,10 +198,14 @@ export interface WorkerNode<Worker extends IWorker, Data = unknown> {
    * Worker node worker.
    */
   readonly worker: Worker
+  /**
+   * Worker node worker info.
+   */
+  info: WorkerInfo
   /**
    * Worker node worker usage statistics.
    */
-  workerUsage: WorkerUsage
+  usage: WorkerUsage
   /**
    * Worker node tasks queue.
    */
index f330d192296dcc62a5b86a4209126d21798acff6..1f724274150f401833df2dec47750528235a578e 100644 (file)
@@ -53,6 +53,10 @@ export interface WorkerStatistics {
  */
 export interface MessageValue<Data = unknown, ErrorData = unknown>
   extends Task<Data> {
+  /**
+   * Worker Id.
+   */
+  readonly workerId?: number
   /**
    * Kill code.
    */
@@ -66,9 +70,13 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
    */
   readonly taskPerformance?: TaskPerformance
   /**
-   * Whether to compute the given statistics or not.
+   * Whether the worker computes the given statistics or not.
    */
   readonly statistics?: WorkerStatistics
+  /**
+   * Whether the worker has started or not.
+   */
+  readonly started?: boolean
 }
 
 /**
index 25be4820a0e8c59faa8917e0a809633c0d97de83..242c46982236357626ae50fae5f3a33d7c3f04eb 100644 (file)
@@ -36,6 +36,10 @@ export abstract class AbstractWorker<
   Data = unknown,
   Response = unknown
 > extends AsyncResource {
+  /**
+   * Worker Id.
+   */
+  protected abstract id: number
   /**
    * Task function(s) processed by the worker when the pool's `execution` function is invoked.
    */
@@ -225,6 +229,7 @@ export abstract class AbstractWorker<
       this.sendToMainWorker({
         data: res,
         taskPerformance,
+        workerId: this.id,
         id: message.id
       })
     } catch (e) {
@@ -234,6 +239,7 @@ export abstract class AbstractWorker<
           message: err,
           data: message.data
         },
+        workerId: this.id,
         id: message.id
       })
     } finally {
@@ -258,6 +264,7 @@ export abstract class AbstractWorker<
         this.sendToMainWorker({
           data: res,
           taskPerformance,
+          workerId: this.id,
           id: message.id
         })
         return null
@@ -269,6 +276,7 @@ export abstract class AbstractWorker<
             message: err,
             data: message.data
           },
+          workerId: this.id,
           id: message.id
         })
       })
index 13735b1d697c076c547981e9d5aad8bd22633f84..16dddd2969bd50193c4fb43e762d607f08f4ca08 100644 (file)
@@ -41,10 +41,19 @@ export class ClusterWorker<
       cluster.worker as Worker,
       opts
     )
+    if (!this.isMain) {
+      this.sendToMainWorker({ workerId: this.id, started: true })
+    }
+  }
+
+  /** @inheritDoc */
+  protected get id (): number {
+    return this.getMainWorker().id
   }
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
+    console.log('sending message to main worker(cluster)', message)
     this.getMainWorker().send(message)
   }
 
index b6573a974cc653656fb6a968b0437e57eeed7dd6..7a766a958ccbb41c9b29bbe76c04890eec61604d 100644 (file)
@@ -1,4 +1,9 @@
-import { type MessagePort, isMainThread, parentPort } from 'node:worker_threads'
+import {
+  type MessagePort,
+  isMainThread,
+  parentPort,
+  threadId
+} from 'node:worker_threads'
 import type { MessageValue } from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
@@ -41,10 +46,18 @@ export class ThreadWorker<
       parentPort as MessagePort,
       opts
     )
+    if (!this.isMain) {
+      this.sendToMainWorker({ workerId: this.id, started: true })
+    }
+  }
+
+  protected get id (): number {
+    return threadId
   }
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
+    console.log('sending message to main worker(thread)', message)
     this.getMainWorker().postMessage(message)
   }
 }
index b5a53487a490dd32b3d460389e25de7c5993feeb..32f5bbeefa611bad7d32967d4701f99ebefd6cdd 100644 (file)
@@ -406,11 +406,14 @@ describe('Abstract pool test suite', () => {
       maxQueuedTasks: 0,
       failedTasks: 0
     })
+    for (const workerNode of pool.workerNodes) {
+      console.log('thread:workerNode.info', workerNode.info)
+    }
     await pool.destroy()
     pool = new DynamicClusterPool(
       numberOfWorkers,
       numberOfWorkers * 2,
-      './tests/worker-files/thread/testWorker.js'
+      './tests/worker-files/cluster/testWorker.js'
     )
     expect(pool.info).toStrictEqual({
       type: PoolTypes.dynamic,
@@ -426,13 +429,16 @@ describe('Abstract pool test suite', () => {
       maxQueuedTasks: 0,
       failedTasks: 0
     })
+    for (const workerNode of pool.workerNodes) {
+      console.log('cluster:workerNode.info', workerNode.info)
+    }
     await pool.destroy()
   })
 
   it('Simulate worker not found', async () => {
     const pool = new StubPoolWithRemoveAllWorker(
       numberOfWorkers,
-      './tests/worker-files/cluster/testWorker.js',
+      './tests/worker-files/thread/testWorker.js',
       {
         errorHandler: e => console.error(e)
       }
@@ -450,7 +456,7 @@ describe('Abstract pool test suite', () => {
       './tests/worker-files/cluster/testWorker.js'
     )
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: 0,
           executing: 0,
@@ -515,7 +521,7 @@ describe('Abstract pool test suite', () => {
       promises.add(pool.execute())
     }
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: 0,
           executing: maxMultiplier,
@@ -554,7 +560,7 @@ describe('Abstract pool test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: maxMultiplier,
           executing: 0,
@@ -607,7 +613,7 @@ describe('Abstract pool test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -643,14 +649,12 @@ describe('Abstract pool test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
-        maxMultiplier
-      )
+      expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
     }
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: 0,
           executing: 0,
@@ -686,8 +690,8 @@ describe('Abstract pool test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.runTime.history.length).toBe(0)
-      expect(workerNode.workerUsage.waitTime.history.length).toBe(0)
+      expect(workerNode.usage.runTime.history.length).toBe(0)
+      expect(workerNode.usage.waitTime.history.length).toBe(0)
     }
     await pool.destroy()
   })
index 8f3fad8b43f919dbc2a8d5c1d695f461cdd7a5fb..631c59f02d177b739c85bf11f2ae219087353f43 100644 (file)
@@ -96,12 +96,12 @@ describe('Fixed cluster pool test suite', () => {
     }
     expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
         queuePool.opts.tasksQueueOptions.concurrency
       )
-      expect(workerNode.workerUsage.tasks.executed).toBe(0)
-      expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0)
-      expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.executed).toBe(0)
+      expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
     }
     expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
     expect(queuePool.info.queuedTasks).toBe(
@@ -112,13 +112,11 @@ describe('Fixed cluster pool test suite', () => {
     )
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.workerUsage.tasks.executing).toBe(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
-        maxMultiplier
-      )
-      expect(workerNode.workerUsage.tasks.queued).toBe(0)
-      expect(workerNode.workerUsage.tasks.maxQueued).toBe(1)
+      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
+      expect(workerNode.usage.tasks.queued).toBe(0)
+      expect(workerNode.usage.tasks.maxQueued).toBe(1)
     }
   })
 
@@ -154,7 +152,7 @@ describe('Fixed cluster pool test suite', () => {
     })
     expect(
       errorPool.workerNodes.some(
-        workerNode => workerNode.workerUsage.tasks.failed === 1
+        workerNode => workerNode.usage.tasks.failed === 1
       )
     ).toBe(true)
   })
@@ -180,7 +178,7 @@ describe('Fixed cluster pool test suite', () => {
     })
     expect(
       asyncErrorPool.workerNodes.some(
-        workerNode => workerNode.workerUsage.tasks.failed === 1
+        workerNode => workerNode.usage.tasks.failed === 1
       )
     ).toBe(true)
   })
index dd5a398fc9a0b101b5a6fe27c3a1c38accaa94b9..b2aa4a0b69fbd4e006ce6f53fecb90295edb535d 100644 (file)
@@ -209,7 +209,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: maxMultiplier,
           executing: 0,
@@ -270,7 +270,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: maxMultiplier,
           executing: 0,
@@ -476,7 +476,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -512,8 +512,8 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
     }
@@ -536,7 +536,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -572,8 +572,8 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
     }
@@ -675,7 +675,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -711,14 +711,12 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual(
-        0
-      )
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -739,7 +737,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -775,14 +773,12 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual(
-        0
-      )
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -882,7 +878,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -918,12 +914,12 @@ describe('Selection strategies test suite', () => {
           utilization: expect.any(Number)
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
+      expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -944,7 +940,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -980,12 +976,12 @@ describe('Selection strategies test suite', () => {
           utilization: expect.any(Number)
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
+      expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
     }
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -1085,7 +1081,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1121,14 +1117,14 @@ describe('Selection strategies test suite', () => {
           utilization: expect.any(Number)
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.runTime.average).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1154,7 +1150,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1190,14 +1186,14 @@ describe('Selection strategies test suite', () => {
           utilization: expect.any(Number)
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.runTime.average).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1228,7 +1224,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1264,14 +1260,14 @@ describe('Selection strategies test suite', () => {
           utilization: expect.any(Number)
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.runTime.median).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.runTime.median).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1450,7 +1446,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1486,12 +1482,12 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.runTime.average).toBeGreaterThanOrEqual(0)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1522,7 +1518,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1558,12 +1554,12 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
-      expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+      expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1599,7 +1595,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1635,12 +1631,12 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
-      expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
+      expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+      expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1833,7 +1829,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: maxMultiplier,
           executing: 0,
@@ -1916,7 +1912,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.workerUsage).toStrictEqual({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: maxMultiplier,
           executing: 0,
index 017cd7e3fe75a57de564d8895dbc95ae964e16d6..8a0cfeadcbd4660b63e604d5214674d4328f8adb 100644 (file)
@@ -96,12 +96,12 @@ describe('Fixed thread pool test suite', () => {
     }
     expect(promises.size).toBe(numberOfThreads * maxMultiplier)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual(
+      expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
         queuePool.opts.tasksQueueOptions.concurrency
       )
-      expect(workerNode.workerUsage.tasks.executed).toBe(0)
-      expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0)
-      expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.executed).toBe(0)
+      expect(workerNode.usage.tasks.queued).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0)
     }
     expect(queuePool.info.executingTasks).toBe(numberOfThreads)
     expect(queuePool.info.queuedTasks).toBe(
@@ -112,13 +112,11 @@ describe('Fixed thread pool test suite', () => {
     )
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
-      expect(workerNode.workerUsage.tasks.executing).toBe(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
-      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
-        maxMultiplier
-      )
-      expect(workerNode.workerUsage.tasks.queued).toBe(0)
-      expect(workerNode.workerUsage.tasks.maxQueued).toBe(1)
+      expect(workerNode.usage.tasks.executing).toBe(0)
+      expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
+      expect(workerNode.usage.tasks.queued).toBe(0)
+      expect(workerNode.usage.tasks.maxQueued).toBe(1)
     }
   })
 
@@ -156,7 +154,7 @@ describe('Fixed thread pool test suite', () => {
     })
     expect(
       errorPool.workerNodes.some(
-        workerNode => workerNode.workerUsage.tasks.failed === 1
+        workerNode => workerNode.usage.tasks.failed === 1
       )
     ).toBe(true)
   })
@@ -184,7 +182,7 @@ describe('Fixed thread pool test suite', () => {
     })
     expect(
       asyncErrorPool.workerNodes.some(
-        workerNode => workerNode.workerUsage.tasks.failed === 1
+        workerNode => workerNode.usage.tasks.failed === 1
       )
     ).toBe(true)
   })
@@ -210,7 +208,7 @@ describe('Fixed thread pool test suite', () => {
   })
 
   it('Verify that thread pool options are checked', async () => {
-    const workerFilePath = './tests/worker-files/cluster/testWorker.js'
+    const workerFilePath = './tests/worker-files/thread/testWorker.js'
     let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath)
     expect(pool1.opts.workerOptions).toBeUndefined()
     await pool1.destroy()