perf: use a single array to store pool workers and their related data
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 2 Apr 2023 20:10:47 +0000 (22:10 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 2 Apr 2023 20:10:47 +0000 (22:10 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
13 files changed:
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/pool-internal.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/less-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/thread/dynamic.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/thread/dynamic.test.js
tests/test-utils.js

index 947db17c8480ea97de0591c280042081c0ba4935..5ed353f602d4f0420e6fca66044122c749098074 100644 (file)
@@ -29,19 +29,11 @@ export abstract class AbstractPool<
   Response = unknown
 > implements IPoolInternal<Worker, Data, Response> {
   /** {@inheritDoc} */
-  public readonly workers: Map<number, WorkerType<Worker>> = new Map<
-  number,
-  WorkerType<Worker>
-  >()
+  public readonly workers: Array<WorkerType<Worker>> = []
 
   /** {@inheritDoc} */
   public readonly emitter?: PoolEmitter
 
-  /**
-   * Id of the next worker.
-   */
-  protected nextWorkerId: number = 0
-
   /**
    * The promise map.
    *
@@ -159,8 +151,8 @@ export abstract class AbstractPool<
    * @param worker - The worker.
    * @returns The worker key.
    */
-  private getWorkerKey (worker: Worker): number | undefined {
-    return [...this.workers].find(([, value]) => value.worker === worker)?.[0]
+  private getWorkerKey (worker: Worker): number {
+    return this.workers.findIndex(workerItem => workerItem.worker === worker)
   }
 
   /** {@inheritDoc} */
@@ -168,8 +160,8 @@ export abstract class AbstractPool<
     workerChoiceStrategy: WorkerChoiceStrategy
   ): void {
     this.opts.workerChoiceStrategy = workerChoiceStrategy
-    for (const [key, value] of this.workers) {
-      this.setWorker(key, value.worker, {
+    for (const workerItem of this.workers) {
+      this.setWorker(workerItem.worker, {
         run: 0,
         running: 0,
         runTime: 0,
@@ -193,10 +185,10 @@ export abstract class AbstractPool<
 
   /** {@inheritDoc} */
   public findFreeWorker (): Worker | false {
-    for (const value of this.workers.values()) {
-      if (value.tasksUsage.running === 0) {
+    for (const workerItem of this.workers) {
+      if (workerItem.tasksUsage.running === 0) {
         // A worker is free, return the matching worker
-        return value.worker
+        return workerItem.worker
       }
     }
     return false
@@ -220,8 +212,8 @@ export abstract class AbstractPool<
   /** {@inheritDoc} */
   public async destroy (): Promise<void> {
     await Promise.all(
-      [...this.workers].map(async ([, value]) => {
-        await this.destroyWorker(value.worker)
+      this.workers.map(async workerItem => {
+        await this.destroyWorker(workerItem.worker)
       })
     )
   }
@@ -290,8 +282,7 @@ export abstract class AbstractPool<
    * @param worker - The worker that will be removed.
    */
   protected removeWorker (worker: Worker): void {
-    this.workers.delete(this.getWorkerKey(worker) as number)
-    --this.nextWorkerId
+    this.workers.splice(this.getWorkerKey(worker), 1)
   }
 
   /**
@@ -356,13 +347,12 @@ export abstract class AbstractPool<
       this.removeWorker(worker)
     })
 
-    this.setWorker(this.nextWorkerId, worker, {
+    this.setWorker(worker, {
       run: 0,
       running: 0,
       runTime: 0,
       avgRunTime: 0
     })
-    ++this.nextWorkerId
 
     this.afterWorkerSetup(worker)
 
@@ -410,8 +400,8 @@ export abstract class AbstractPool<
   /** {@inheritDoc} */
   public getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
     const workerKey = this.getWorkerKey(worker)
-    if (workerKey !== undefined) {
-      return (this.workers.get(workerKey) as WorkerType<Worker>).tasksUsage
+    if (workerKey !== -1) {
+      return this.workers[workerKey].tasksUsage
     }
     throw new Error('Worker could not be found in the pool')
   }
@@ -419,16 +409,11 @@ export abstract class AbstractPool<
   /**
    * Sets the given worker.
    *
-   * @param workerKey - The worker key.
    * @param worker - The worker.
    * @param tasksUsage - The worker tasks usage.
    */
-  private setWorker (
-    workerKey: number,
-    worker: Worker,
-    tasksUsage: TasksUsage
-  ): void {
-    this.workers.set(workerKey, {
+  private setWorker (worker: Worker, tasksUsage: TasksUsage): void {
+    this.workers.push({
       worker,
       tasksUsage
     })
index 0a2d0f3879fc6ab44129608fec6d9c6624cd4c1a..8b8c5185772dbea59bc0d22b2e1cb90b9383d507 100644 (file)
@@ -41,6 +41,6 @@ export class DynamicClusterPool<
 
   /** {@inheritDoc} */
   public get busy (): boolean {
-    return this.workers.size === this.max
+    return this.workers.length === this.max
   }
 }
index 9da6495266ab6c6e4654def94eca460b4630cb24..9489d96f1c1ffc3ef548a06423b2bb05647bf277 100644 (file)
@@ -42,9 +42,9 @@ export interface IPoolInternal<
   Response = unknown
 > extends IPool<Data, Response> {
   /**
-   * Pool workers map.
+   * Pool workers item array.
    */
-  readonly workers: Map<number, WorkerType<Worker>>
+  readonly workers: Array<WorkerType<Worker>>
 
   /**
    * Pool type.
index 7415cf4e34ccc1ac9bbd68e7d7098ba70030f633..48198371b54a7d2f8807dcfdd10e53cdc10a579f 100644 (file)
@@ -46,8 +46,8 @@ export class FairShareWorkerChoiceStrategy<
   public choose (): Worker {
     let minWorkerVirtualTaskEndTimestamp = Infinity
     let chosenWorker!: Worker
-    for (const value of this.pool.workers.values()) {
-      const worker = value.worker
+    for (const workerItem of this.pool.workers) {
+      const worker = workerItem.worker
       this.computeWorkerLastVirtualTaskTimestamp(worker)
       const workerLastVirtualTaskEndTimestamp =
         this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0
index 769f3fb6b5626eed0841a1b6acf86f7d3ebe0bcd..31e7881a2d214d9be2b36a7fcafd909a667d3a89 100644 (file)
@@ -28,8 +28,8 @@ export class LessBusyWorkerChoiceStrategy<
   public choose (): Worker {
     let minRunTime = Infinity
     let lessBusyWorker!: Worker
-    for (const value of this.pool.workers.values()) {
-      const worker = value.worker
+    for (const workerItem of this.pool.workers) {
+      const worker = workerItem.worker
       const workerRunTime = this.pool.getWorkerTasksUsage(worker)
         ?.runTime as number
       if (!this.isDynamicPool && workerRunTime === 0) {
index 61c8fb20643319deb4f64b1e9a7054dc4ea13719..833e605babc9cb0b2f1fcf46d4a823b291e5fa73 100644 (file)
@@ -22,8 +22,8 @@ export class LessUsedWorkerChoiceStrategy<
   public choose (): Worker {
     let minNumberOfTasks = Infinity
     let lessUsedWorker!: Worker
-    for (const value of this.pool.workers.values()) {
-      const worker = value.worker
+    for (const workerItem of this.pool.workers) {
+      const worker = workerItem.worker
       const tasksUsage = this.pool.getWorkerTasksUsage(worker)
       const workerTasks =
         (tasksUsage?.run as number) + (tasksUsage?.running as number)
index 44dfa4b76a941fe6caa005fe07bf5dc5ccd34644..a23275873909b0731cb3d514af74f0b9fb59090a 100644 (file)
@@ -26,10 +26,9 @@ export class RoundRobinWorkerChoiceStrategy<
 
   /** {@inheritDoc} */
   public choose (): Worker {
-    const chosenWorker = this.pool.workers.get(this.nextWorkerId)
-      ?.worker as Worker
+    const chosenWorker = this.pool.workers[this.nextWorkerId]?.worker
     this.nextWorkerId =
-      this.nextWorkerId === this.pool.workers.size - 1
+      this.nextWorkerId === this.pool.workers.length - 1
         ? 0
         : this.nextWorkerId + 1
     return chosenWorker
index 1af7e6568f19a0705df40014ff404e91645dfd09..9ee4fa656632c7fcdfc982e5b7cf40b794d57e97 100644 (file)
@@ -67,8 +67,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
 
   /** {@inheritDoc} */
   public choose (): Worker {
-    const chosenWorker = this.pool.workers.get(this.currentWorkerId)
-      ?.worker as Worker
+    const chosenWorker = this.pool.workers[this.currentWorkerId]?.worker
     if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorker)) {
       this.initWorkerTaskRunTime(chosenWorker)
     }
@@ -86,11 +85,11 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
       )
     } else {
       this.currentWorkerId =
-        this.currentWorkerId === this.pool.workers.size - 1
+        this.currentWorkerId === this.pool.workers.length - 1
           ? 0
           : this.currentWorkerId + 1
       this.setWorkerTaskRunTime(
-        this.pool.workers.get(this.currentWorkerId)?.worker as Worker,
+        this.pool.workers[this.currentWorkerId]?.worker,
         workerTaskWeight,
         0
       )
@@ -99,8 +98,8 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   }
 
   private initWorkersTaskRunTime (): void {
-    for (const value of this.pool.workers.values()) {
-      this.initWorkerTaskRunTime(value.worker)
+    for (const workerItem of this.pool.workers) {
+      this.initWorkerTaskRunTime(workerItem.worker)
     }
   }
 
index d5be43f189f469b46ce16ef9bcdbf4720e33b593..19cb9fbf6b9a21a649c9fe548955e62abc6c750f 100644 (file)
@@ -42,6 +42,6 @@ export class DynamicThreadPool<
 
   /** {@inheritDoc} */
   public get busy (): boolean {
-    return this.workers.size === this.max
+    return this.workers.length === this.max
   }
 }
index 83290d6a0553243840e9c8b313d2ececacd4386d..a87c8934701bd442af287fcdc91e27af33eeda41 100644 (file)
@@ -13,7 +13,7 @@ describe('Abstract pool test suite', () => {
   )
   class StubPoolWithRemoveAllWorker extends FixedThreadPool {
     removeAllWorker () {
-      this.workers = new Map()
+      this.workers = []
       this.promiseMap.clear()
     }
   }
@@ -139,12 +139,12 @@ describe('Abstract pool test suite', () => {
       numberOfWorkers,
       './tests/worker-files/cluster/testWorker.js'
     )
-    for (const value of pool.workers.values()) {
-      expect(value.tasksUsage).toBeDefined()
-      expect(value.tasksUsage.run).toBe(0)
-      expect(value.tasksUsage.running).toBe(0)
-      expect(value.tasksUsage.runTime).toBe(0)
-      expect(value.tasksUsage.avgRunTime).toBe(0)
+    for (const workerItem of pool.workers) {
+      expect(workerItem.tasksUsage).toBeDefined()
+      expect(workerItem.tasksUsage.run).toBe(0)
+      expect(workerItem.tasksUsage.running).toBe(0)
+      expect(workerItem.tasksUsage.runTime).toBe(0)
+      expect(workerItem.tasksUsage.avgRunTime).toBe(0)
     }
     await pool.destroy()
   })
@@ -158,20 +158,20 @@ describe('Abstract pool test suite', () => {
     for (let i = 0; i < numberOfWorkers * 2; i++) {
       promises.push(pool.execute())
     }
-    for (const value of pool.workers.values()) {
-      expect(value.tasksUsage).toBeDefined()
-      expect(value.tasksUsage.run).toBe(0)
-      expect(value.tasksUsage.running).toBe(numberOfWorkers * 2)
-      expect(value.tasksUsage.runTime).toBe(0)
-      expect(value.tasksUsage.avgRunTime).toBe(0)
+    for (const workerItem of pool.workers) {
+      expect(workerItem.tasksUsage).toBeDefined()
+      expect(workerItem.tasksUsage.run).toBe(0)
+      expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
+      expect(workerItem.tasksUsage.runTime).toBe(0)
+      expect(workerItem.tasksUsage.avgRunTime).toBe(0)
     }
     await Promise.all(promises)
-    for (const value of pool.workers.values()) {
-      expect(value.tasksUsage).toBeDefined()
-      expect(value.tasksUsage.run).toBe(numberOfWorkers * 2)
-      expect(value.tasksUsage.running).toBe(0)
-      expect(value.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
-      expect(value.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+    for (const workerItem of pool.workers) {
+      expect(workerItem.tasksUsage).toBeDefined()
+      expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
+      expect(workerItem.tasksUsage.running).toBe(0)
+      expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
     }
     await pool.destroy()
   })
@@ -187,20 +187,20 @@ describe('Abstract pool test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
-    for (const value of pool.workers.values()) {
-      expect(value.tasksUsage).toBeDefined()
-      expect(value.tasksUsage.run).toBe(numberOfWorkers * 2)
-      expect(value.tasksUsage.running).toBe(0)
-      expect(value.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
-      expect(value.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+    for (const workerItem of pool.workers) {
+      expect(workerItem.tasksUsage).toBeDefined()
+      expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
+      expect(workerItem.tasksUsage.running).toBe(0)
+      expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+      expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
     }
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
-    for (const value of pool.workers.values()) {
-      expect(value.tasksUsage).toBeDefined()
-      expect(value.tasksUsage.run).toBe(0)
-      expect(value.tasksUsage.running).toBe(0)
-      expect(value.tasksUsage.runTime).toBe(0)
-      expect(value.tasksUsage.avgRunTime).toBe(0)
+    for (const workerItem of pool.workers) {
+      expect(workerItem.tasksUsage).toBeDefined()
+      expect(workerItem.tasksUsage.run).toBe(0)
+      expect(workerItem.tasksUsage.running).toBe(0)
+      expect(workerItem.tasksUsage.runTime).toBe(0)
+      expect(workerItem.tasksUsage.avgRunTime).toBe(0)
     }
     await pool.destroy()
   })
index 7a834ca237d64df507036ee797b7c6e315d140fe..ddf42501d43ed63a3f84831b3aa2b13ba44fc701 100644 (file)
@@ -32,8 +32,8 @@ describe('Dynamic cluster pool test suite', () => {
     for (let i = 0; i < max * 2; i++) {
       pool.execute()
     }
-    expect(pool.workers.size).toBeLessThanOrEqual(max)
-    expect(pool.workers.size).toBeGreaterThan(min)
+    expect(pool.workers.length).toBeLessThanOrEqual(max)
+    expect(pool.workers.length).toBeGreaterThan(min)
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
@@ -42,19 +42,19 @@ describe('Dynamic cluster pool test suite', () => {
   })
 
   it('Verify scale worker up and down is working', async () => {
-    expect(pool.workers.size).toBe(min)
+    expect(pool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.size).toBeGreaterThan(min)
+    expect(pool.workers.length).toBeGreaterThan(min)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.size).toBe(min)
+    expect(pool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.size).toBeGreaterThan(min)
+    expect(pool.workers.length).toBeGreaterThan(min)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.size).toBe(min)
+    expect(pool.workers.length).toBe(min)
   })
 
   it('Shutdown test', async () => {
@@ -93,14 +93,14 @@ describe('Dynamic cluster pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.size).toBe(min)
+    expect(longRunningPool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.size).toBe(max)
+    expect(longRunningPool.workers.length).toBe(max)
     await TestUtils.waitExits(longRunningPool, max - min)
     // Here we expect the workers to be at the max size since that the task is still running
-    expect(longRunningPool.workers.size).toBe(min)
+    expect(longRunningPool.workers.length).toBe(min)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
@@ -116,14 +116,14 @@ describe('Dynamic cluster pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.size).toBe(min)
+    expect(longRunningPool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.size).toBe(max)
+    expect(longRunningPool.workers.length).toBe(max)
     await TestUtils.sleep(1500)
     // Here we expect the workers to be at the max size since that the task is still running
-    expect(longRunningPool.workers.size).toBe(max)
+    expect(longRunningPool.workers.length).toBe(max)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
index b544452fce8351a9ed9f10136f45547b53b2b343..9036642f1b00a87bd9c8eeebb9a72ffa9abd81a0 100644 (file)
@@ -32,8 +32,8 @@ describe('Dynamic thread pool test suite', () => {
     for (let i = 0; i < max * 2; i++) {
       pool.execute()
     }
-    expect(pool.workers.size).toBeLessThanOrEqual(max)
-    expect(pool.workers.size).toBeGreaterThan(min)
+    expect(pool.workers.length).toBeLessThanOrEqual(max)
+    expect(pool.workers.length).toBeGreaterThan(min)
     // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolBusy).toBe(max + 1)
@@ -42,19 +42,19 @@ describe('Dynamic thread pool test suite', () => {
   })
 
   it('Verify scale thread up and down is working', async () => {
-    expect(pool.workers.size).toBe(min)
+    expect(pool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.size).toBe(max)
+    expect(pool.workers.length).toBe(max)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.size).toBe(min)
+    expect(pool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       pool.execute()
     }
-    expect(pool.workers.size).toBe(max)
+    expect(pool.workers.length).toBe(max)
     await TestUtils.waitExits(pool, max - min)
-    expect(pool.workers.size).toBe(min)
+    expect(pool.workers.length).toBe(min)
   })
 
   it('Shutdown test', async () => {
@@ -93,13 +93,13 @@ describe('Dynamic thread pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.size).toBe(min)
+    expect(longRunningPool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.size).toBe(max)
+    expect(longRunningPool.workers.length).toBe(max)
     await TestUtils.waitExits(longRunningPool, max - min)
-    expect(longRunningPool.workers.size).toBe(min)
+    expect(longRunningPool.workers.length).toBe(min)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
@@ -115,14 +115,14 @@ describe('Dynamic thread pool test suite', () => {
         exitHandler: () => console.log('long running worker exited')
       }
     )
-    expect(longRunningPool.workers.size).toBe(min)
+    expect(longRunningPool.workers.length).toBe(min)
     for (let i = 0; i < max * 10; i++) {
       longRunningPool.execute()
     }
-    expect(longRunningPool.workers.size).toBe(max)
+    expect(longRunningPool.workers.length).toBe(max)
     await TestUtils.sleep(1500)
     // Here we expect the workers to be at the max size since that the task is still running
-    expect(longRunningPool.workers.size).toBe(max)
+    expect(longRunningPool.workers.length).toBe(max)
     // We need to clean up the resources after our test
     await longRunningPool.destroy()
   })
index 7314f261dffee839dd15e44b26eece688920ecfa..6d3592c1b5d2e2798c00b95fe213c1c630370c96 100644 (file)
@@ -4,8 +4,8 @@ class TestUtils {
   static async waitExits (pool, numberOfExitEventsToWait) {
     return new Promise(resolve => {
       let exitEvents = 0
-      for (const value of pool.workers.values()) {
-        value.worker.on('exit', () => {
+      for (const workerItem of pool.workers) {
+        workerItem.worker.on('exit', () => {
           ++exitEvents
           if (exitEvents === numberOfExitEventsToWait) {
             resolve(exitEvents)