From 29ee7e9a3f325f87d889ef09ffc1eea4916a782f Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 5 May 2023 00:00:47 +0200 Subject: [PATCH] feat: use O(1) queue implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 +++ src/pools/abstract-pool.ts | 20 +++++++----- src/pools/worker.ts | 3 +- src/queue.ts | 37 ++++++++++++++++++++++ tests/pools/abstract/abstract-pool.test.js | 5 +-- tests/pools/cluster/fixed.test.js | 4 +-- tests/pools/thread/fixed.test.js | 4 +-- 7 files changed, 62 insertions(+), 15 deletions(-) create mode 100644 src/queue.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 631e2d66..3d11d6f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Use O(1) queue implementation for tasks queueing. + ## [2.4.11] - 2023-04-23 ### Changed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2ee3a0cc..8ab6b26e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -7,6 +7,7 @@ import { } from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' import { CircularArray } from '../circular-array' +import { Queue } from '../queue' import { type IPool, PoolEmitter, @@ -197,7 +198,7 @@ export abstract class AbstractPool< return 0 } return this.workerNodes.reduce( - (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length, + (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size(), 0 ) } @@ -586,7 +587,7 @@ export abstract class AbstractPool< medRunTime: 0, error: 0 }, - tasksQueue: [] + tasksQueue: new Queue>() }) } @@ -602,7 +603,7 @@ export abstract class AbstractPool< workerNodeKey: number, worker: Worker, tasksUsage: TasksUsage, - tasksQueue: Array> + tasksQueue: Queue> ): void { this.workerNodes[workerNodeKey] = { worker, @@ -628,21 +629,24 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].tasksQueue.push(task) + return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task) } private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].tasksQueue.shift() + return this.workerNodes[workerNodeKey].tasksQueue.dequeue() } private tasksQueueSize (workerNodeKey: number): number { - return this.workerNodes[workerNodeKey].tasksQueue.length + return this.workerNodes[workerNodeKey].tasksQueue.size() } private flushTasksQueue (workerNodeKey: number): void { if (this.tasksQueueSize(workerNodeKey) > 0) { - for (const task of this.workerNodes[workerNodeKey].tasksQueue) { - this.executeTask(workerNodeKey, task) + for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) { + this.executeTask( + workerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) } } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index adec0138..716376fa 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -1,4 +1,5 @@ import type { CircularArray } from '../circular-array' +import type { Queue } from '../queue' /** * Callback invoked if the worker has received a message. @@ -124,5 +125,5 @@ export interface WorkerNode { /** * Worker node tasks queue. */ - readonly tasksQueue: Array> + readonly tasksQueue: Queue> } diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 00000000..5cc13fcd --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,37 @@ +/** + * Queue + */ +export class Queue { + private items: Record + private head: number + private tail: number + + constructor () { + this.items = {} + this.head = 0 + this.tail = 0 + } + + enqueue (item: T): number { + this.items[this.tail] = item + this.tail++ + return this.size() + } + + dequeue (): T | undefined { + if (this.size() <= 0) return undefined + const item = this.items[this.head] + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete this.items[this.head] + this.head++ + if (this.head === this.tail) { + this.head = 0 + this.tail = 0 + } + return item + } + + size (): number { + return this.tail - this.head + } +} diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 2ea4649b..eba4a896 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -7,6 +7,7 @@ const { WorkerChoiceStrategies } = require('../../../lib') const { CircularArray } = require('../../../lib/circular-array') +const { Queue } = require('../../../lib/queue') describe('Abstract pool test suite', () => { const numberOfWorkers = 1 @@ -283,8 +284,8 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueue).toBeDefined() - expect(workerNode.tasksQueue).toBeInstanceOf(Array) - expect(workerNode.tasksQueue.length).toBe(0) + expect(workerNode.tasksQueue).toBeInstanceOf(Queue) + expect(workerNode.tasksQueue.size()).toBe(0) } await pool.destroy() }) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 21dcd78b..49487685 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -100,7 +100,7 @@ describe('Fixed cluster pool test suite', () => { queuePool.opts.tasksQueueOptions.concurrency ) expect(workerNode.tasksUsage.run).toBe(0) - expect(workerNode.tasksQueue.length).toBeGreaterThan(0) + expect(workerNode.tasksQueue.size()).toBeGreaterThan(0) } expect(queuePool.numberOfRunningTasks).toBe(numberOfWorkers) expect(queuePool.numberOfQueuedTasks).toBe( @@ -110,7 +110,7 @@ describe('Fixed cluster pool test suite', () => { for (const workerNode of queuePool.workerNodes) { expect(workerNode.tasksUsage.running).toBe(0) expect(workerNode.tasksUsage.run).toBeGreaterThan(0) - expect(workerNode.tasksQueue.length).toBe(0) + expect(workerNode.tasksQueue.size()).toBe(0) } promises.clear() }) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 1d07e0c7..7e7a74cf 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -100,7 +100,7 @@ describe('Fixed thread pool test suite', () => { queuePool.opts.tasksQueueOptions.concurrency ) expect(workerNode.tasksUsage.run).toBe(0) - expect(workerNode.tasksQueue.length).toBeGreaterThan(0) + expect(workerNode.tasksQueue.size()).toBeGreaterThan(0) } expect(queuePool.numberOfRunningTasks).toBe(numberOfThreads) expect(queuePool.numberOfQueuedTasks).toBe( @@ -110,7 +110,7 @@ describe('Fixed thread pool test suite', () => { for (const workerNode of queuePool.workerNodes) { expect(workerNode.tasksUsage.running).toBe(0) expect(workerNode.tasksUsage.run).toBeGreaterThan(0) - expect(workerNode.tasksQueue.length).toBe(0) + expect(workerNode.tasksQueue.size()).toBe(0) } promises.clear() }) -- 2.34.1