feat: use O(1) queue implementation
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 4 May 2023 22:00:47 +0000 (00:00 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 4 May 2023 22:00:47 +0000 (00:00 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/worker.ts
src/queue.ts [new file with mode: 0644]
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index 631e2d66d1acbadfa75f06348b1c4995aa4b328b..3d11d6f20be71c394f5cdce00e6dbfa01e6cf8f9 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- Use O(1) queue implementation for tasks queueing.
+
 ## [2.4.11] - 2023-04-23
 
 ### Changed
index 2ee3a0ccb968100e23a5deecfe2eb4e049171875..8ab6b26e4f385f54f7c46d5cc51f1aa5f5ea0212 100644 (file)
@@ -7,6 +7,7 @@ import {
 } from '../utils'
 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
 import { CircularArray } from '../circular-array'
+import { Queue } from '../queue'
 import {
   type IPool,
   PoolEmitter,
@@ -197,7 +198,7 @@ export abstract class AbstractPool<
       return 0
     }
     return this.workerNodes.reduce(
-      (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length,
+      (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size(),
       0
     )
   }
@@ -586,7 +587,7 @@ export abstract class AbstractPool<
         medRunTime: 0,
         error: 0
       },
-      tasksQueue: []
+      tasksQueue: new Queue<Task<Data>>()
     })
   }
 
@@ -602,7 +603,7 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     worker: Worker,
     tasksUsage: TasksUsage,
-    tasksQueue: Array<Task<Data>>
+    tasksQueue: Queue<Task<Data>>
   ): void {
     this.workerNodes[workerNodeKey] = {
       worker,
@@ -628,21 +629,24 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.push(task)
+    return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].tasksQueue.shift()
+    return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
   }
 
   private tasksQueueSize (workerNodeKey: number): number {
-    return this.workerNodes[workerNodeKey].tasksQueue.length
+    return this.workerNodes[workerNodeKey].tasksQueue.size()
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
     if (this.tasksQueueSize(workerNodeKey) > 0) {
-      for (const task of this.workerNodes[workerNodeKey].tasksQueue) {
-        this.executeTask(workerNodeKey, task)
+      for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
+        this.executeTask(
+          workerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
       }
     }
   }
index adec0138d86e940686f1d60026616c0ce9c2c922..716376faf54ac6925cf4d4ff2ea4804e1d2cb38e 100644 (file)
@@ -1,4 +1,5 @@
 import type { CircularArray } from '../circular-array'
+import type { Queue } from '../queue'
 
 /**
  * Callback invoked if the worker has received a message.
@@ -124,5 +125,5 @@ export interface WorkerNode<Worker extends IWorker, Data = unknown> {
   /**
    * Worker node tasks queue.
    */
-  readonly tasksQueue: Array<Task<Data>>
+  readonly tasksQueue: Queue<Task<Data>>
 }
diff --git a/src/queue.ts b/src/queue.ts
new file mode 100644 (file)
index 0000000..5cc13fc
--- /dev/null
@@ -0,0 +1,37 @@
+/**
+ * Queue
+ */
+export class Queue<T> {
+  private items: Record<number, T>
+  private head: number
+  private tail: number
+
+  constructor () {
+    this.items = {}
+    this.head = 0
+    this.tail = 0
+  }
+
+  enqueue (item: T): number {
+    this.items[this.tail] = item
+    this.tail++
+    return this.size()
+  }
+
+  dequeue (): T | undefined {
+    if (this.size() <= 0) return undefined
+    const item = this.items[this.head]
+    // eslint-disable-next-line @typescript-eslint/no-dynamic-delete
+    delete this.items[this.head]
+    this.head++
+    if (this.head === this.tail) {
+      this.head = 0
+      this.tail = 0
+    }
+    return item
+  }
+
+  size (): number {
+    return this.tail - this.head
+  }
+}
index 2ea4649baedb03a74930eea37a1d39bbfdcea4de..eba4a896331ba0760045b5e626fe988a5e0a3ece 100644 (file)
@@ -7,6 +7,7 @@ const {
   WorkerChoiceStrategies
 } = require('../../../lib')
 const { CircularArray } = require('../../../lib/circular-array')
+const { Queue } = require('../../../lib/queue')
 
 describe('Abstract pool test suite', () => {
   const numberOfWorkers = 1
@@ -283,8 +284,8 @@ describe('Abstract pool test suite', () => {
     )
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksQueue).toBeDefined()
-      expect(workerNode.tasksQueue).toBeInstanceOf(Array)
-      expect(workerNode.tasksQueue.length).toBe(0)
+      expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
+      expect(workerNode.tasksQueue.size()).toBe(0)
     }
     await pool.destroy()
   })
index 21dcd78bfeb6f3016eb5826025198329a4f244c7..494876857665e50fb3a025e212f76221bdd929ab 100644 (file)
@@ -100,7 +100,7 @@ describe('Fixed cluster pool test suite', () => {
         queuePool.opts.tasksQueueOptions.concurrency
       )
       expect(workerNode.tasksUsage.run).toBe(0)
-      expect(workerNode.tasksQueue.length).toBeGreaterThan(0)
+      expect(workerNode.tasksQueue.size()).toBeGreaterThan(0)
     }
     expect(queuePool.numberOfRunningTasks).toBe(numberOfWorkers)
     expect(queuePool.numberOfQueuedTasks).toBe(
@@ -110,7 +110,7 @@ describe('Fixed cluster pool test suite', () => {
     for (const workerNode of queuePool.workerNodes) {
       expect(workerNode.tasksUsage.running).toBe(0)
       expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
-      expect(workerNode.tasksQueue.length).toBe(0)
+      expect(workerNode.tasksQueue.size()).toBe(0)
     }
     promises.clear()
   })
index 1d07e0c7ca8c24dce32a340ffff31c49c1326aa6..7e7a74cfbdcdf226523eb613d0aa5b72d54266b0 100644 (file)
@@ -100,7 +100,7 @@ describe('Fixed thread pool test suite', () => {
         queuePool.opts.tasksQueueOptions.concurrency
       )
       expect(workerNode.tasksUsage.run).toBe(0)
-      expect(workerNode.tasksQueue.length).toBeGreaterThan(0)
+      expect(workerNode.tasksQueue.size()).toBeGreaterThan(0)
     }
     expect(queuePool.numberOfRunningTasks).toBe(numberOfThreads)
     expect(queuePool.numberOfQueuedTasks).toBe(
@@ -110,7 +110,7 @@ describe('Fixed thread pool test suite', () => {
     for (const workerNode of queuePool.workerNodes) {
       expect(workerNode.tasksUsage.running).toBe(0)
       expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
-      expect(workerNode.tasksQueue.length).toBe(0)
+      expect(workerNode.tasksQueue.size()).toBe(0)
     }
     promises.clear()
   })