From e65c6cd9a3d6ed2e5b8af95120a5aa070101e945 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 2 Apr 2023 22:10:47 +0200 Subject: [PATCH] perf: use a single array to store pool workers and their related data MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 47 +++++--------- src/pools/cluster/dynamic.ts | 2 +- src/pools/pool-internal.ts | 4 +- .../fair-share-worker-choice-strategy.ts | 4 +- .../less-busy-worker-choice-strategy.ts | 4 +- .../less-used-worker-choice-strategy.ts | 4 +- .../round-robin-worker-choice-strategy.ts | 5 +- ...hted-round-robin-worker-choice-strategy.ts | 11 ++-- src/pools/thread/dynamic.ts | 2 +- tests/pools/abstract/abstract-pool.test.js | 62 +++++++++---------- tests/pools/cluster/dynamic.test.js | 26 ++++---- tests/pools/thread/dynamic.test.js | 26 ++++---- tests/test-utils.js | 4 +- 13 files changed, 92 insertions(+), 109 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 947db17c..5ed353f6 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -29,19 +29,11 @@ export abstract class AbstractPool< Response = unknown > implements IPoolInternal { /** {@inheritDoc} */ - public readonly workers: Map> = new Map< - number, - WorkerType - >() + public readonly workers: Array> = [] /** {@inheritDoc} */ public readonly emitter?: PoolEmitter - /** - * Id of the next worker. - */ - protected nextWorkerId: number = 0 - /** * The promise map. * @@ -159,8 +151,8 @@ export abstract class AbstractPool< * @param worker - The worker. * @returns The worker key. */ - private getWorkerKey (worker: Worker): number | undefined { - return [...this.workers].find(([, value]) => value.worker === worker)?.[0] + private getWorkerKey (worker: Worker): number { + return this.workers.findIndex(workerItem => workerItem.worker === worker) } /** {@inheritDoc} */ @@ -168,8 +160,8 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy ): void { this.opts.workerChoiceStrategy = workerChoiceStrategy - for (const [key, value] of this.workers) { - this.setWorker(key, value.worker, { + for (const workerItem of this.workers) { + this.setWorker(workerItem.worker, { run: 0, running: 0, runTime: 0, @@ -193,10 +185,10 @@ export abstract class AbstractPool< /** {@inheritDoc} */ public findFreeWorker (): Worker | false { - for (const value of this.workers.values()) { - if (value.tasksUsage.running === 0) { + for (const workerItem of this.workers) { + if (workerItem.tasksUsage.running === 0) { // A worker is free, return the matching worker - return value.worker + return workerItem.worker } } return false @@ -220,8 +212,8 @@ export abstract class AbstractPool< /** {@inheritDoc} */ public async destroy (): Promise { await Promise.all( - [...this.workers].map(async ([, value]) => { - await this.destroyWorker(value.worker) + this.workers.map(async workerItem => { + await this.destroyWorker(workerItem.worker) }) ) } @@ -290,8 +282,7 @@ export abstract class AbstractPool< * @param worker - The worker that will be removed. */ protected removeWorker (worker: Worker): void { - this.workers.delete(this.getWorkerKey(worker) as number) - --this.nextWorkerId + this.workers.splice(this.getWorkerKey(worker), 1) } /** @@ -356,13 +347,12 @@ export abstract class AbstractPool< this.removeWorker(worker) }) - this.setWorker(this.nextWorkerId, worker, { + this.setWorker(worker, { run: 0, running: 0, runTime: 0, avgRunTime: 0 }) - ++this.nextWorkerId this.afterWorkerSetup(worker) @@ -410,8 +400,8 @@ export abstract class AbstractPool< /** {@inheritDoc} */ public getWorkerTasksUsage (worker: Worker): TasksUsage | undefined { const workerKey = this.getWorkerKey(worker) - if (workerKey !== undefined) { - return (this.workers.get(workerKey) as WorkerType).tasksUsage + if (workerKey !== -1) { + return this.workers[workerKey].tasksUsage } throw new Error('Worker could not be found in the pool') } @@ -419,16 +409,11 @@ export abstract class AbstractPool< /** * Sets the given worker. * - * @param workerKey - The worker key. * @param worker - The worker. * @param tasksUsage - The worker tasks usage. */ - private setWorker ( - workerKey: number, - worker: Worker, - tasksUsage: TasksUsage - ): void { - this.workers.set(workerKey, { + private setWorker (worker: Worker, tasksUsage: TasksUsage): void { + this.workers.push({ worker, tasksUsage }) diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 0a2d0f38..8b8c5185 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -41,6 +41,6 @@ export class DynamicClusterPool< /** {@inheritDoc} */ public get busy (): boolean { - return this.workers.size === this.max + return this.workers.length === this.max } } diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index 9da64952..9489d96f 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -42,9 +42,9 @@ export interface IPoolInternal< Response = unknown > extends IPool { /** - * Pool workers map. + * Pool workers item array. */ - readonly workers: Map> + readonly workers: Array> /** * Pool type. diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 7415cf4e..48198371 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -46,8 +46,8 @@ export class FairShareWorkerChoiceStrategy< public choose (): Worker { let minWorkerVirtualTaskEndTimestamp = Infinity let chosenWorker!: Worker - for (const value of this.pool.workers.values()) { - const worker = value.worker + for (const workerItem of this.pool.workers) { + const worker = workerItem.worker this.computeWorkerLastVirtualTaskTimestamp(worker) const workerLastVirtualTaskEndTimestamp = this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0 diff --git a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts index 769f3fb6..31e7881a 100644 --- a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts @@ -28,8 +28,8 @@ export class LessBusyWorkerChoiceStrategy< public choose (): Worker { let minRunTime = Infinity let lessBusyWorker!: Worker - for (const value of this.pool.workers.values()) { - const worker = value.worker + for (const workerItem of this.pool.workers) { + const worker = workerItem.worker const workerRunTime = this.pool.getWorkerTasksUsage(worker) ?.runTime as number if (!this.isDynamicPool && workerRunTime === 0) { diff --git a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts index 61c8fb20..833e605b 100644 --- a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts @@ -22,8 +22,8 @@ export class LessUsedWorkerChoiceStrategy< public choose (): Worker { let minNumberOfTasks = Infinity let lessUsedWorker!: Worker - for (const value of this.pool.workers.values()) { - const worker = value.worker + for (const workerItem of this.pool.workers) { + const worker = workerItem.worker const tasksUsage = this.pool.getWorkerTasksUsage(worker) const workerTasks = (tasksUsage?.run as number) + (tasksUsage?.running as number) diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 44dfa4b7..a2327587 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -26,10 +26,9 @@ export class RoundRobinWorkerChoiceStrategy< /** {@inheritDoc} */ public choose (): Worker { - const chosenWorker = this.pool.workers.get(this.nextWorkerId) - ?.worker as Worker + const chosenWorker = this.pool.workers[this.nextWorkerId]?.worker this.nextWorkerId = - this.nextWorkerId === this.pool.workers.size - 1 + this.nextWorkerId === this.pool.workers.length - 1 ? 0 : this.nextWorkerId + 1 return chosenWorker diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 1af7e656..9ee4fa65 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -67,8 +67,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** {@inheritDoc} */ public choose (): Worker { - const chosenWorker = this.pool.workers.get(this.currentWorkerId) - ?.worker as Worker + const chosenWorker = this.pool.workers[this.currentWorkerId]?.worker if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorker)) { this.initWorkerTaskRunTime(chosenWorker) } @@ -86,11 +85,11 @@ export class WeightedRoundRobinWorkerChoiceStrategy< ) } else { this.currentWorkerId = - this.currentWorkerId === this.pool.workers.size - 1 + this.currentWorkerId === this.pool.workers.length - 1 ? 0 : this.currentWorkerId + 1 this.setWorkerTaskRunTime( - this.pool.workers.get(this.currentWorkerId)?.worker as Worker, + this.pool.workers[this.currentWorkerId]?.worker, workerTaskWeight, 0 ) @@ -99,8 +98,8 @@ export class WeightedRoundRobinWorkerChoiceStrategy< } private initWorkersTaskRunTime (): void { - for (const value of this.pool.workers.values()) { - this.initWorkerTaskRunTime(value.worker) + for (const workerItem of this.pool.workers) { + this.initWorkerTaskRunTime(workerItem.worker) } } diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index d5be43f1..19cb9fbf 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -42,6 +42,6 @@ export class DynamicThreadPool< /** {@inheritDoc} */ public get busy (): boolean { - return this.workers.size === this.max + return this.workers.length === this.max } } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 83290d6a..a87c8934 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -13,7 +13,7 @@ describe('Abstract pool test suite', () => { ) class StubPoolWithRemoveAllWorker extends FixedThreadPool { removeAllWorker () { - this.workers = new Map() + this.workers = [] this.promiseMap.clear() } } @@ -139,12 +139,12 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/cluster/testWorker.js' ) - for (const value of pool.workers.values()) { - expect(value.tasksUsage).toBeDefined() - expect(value.tasksUsage.run).toBe(0) - expect(value.tasksUsage.running).toBe(0) - expect(value.tasksUsage.runTime).toBe(0) - expect(value.tasksUsage.avgRunTime).toBe(0) + for (const workerItem of pool.workers) { + expect(workerItem.tasksUsage).toBeDefined() + expect(workerItem.tasksUsage.run).toBe(0) + expect(workerItem.tasksUsage.running).toBe(0) + expect(workerItem.tasksUsage.runTime).toBe(0) + expect(workerItem.tasksUsage.avgRunTime).toBe(0) } await pool.destroy() }) @@ -158,20 +158,20 @@ describe('Abstract pool test suite', () => { for (let i = 0; i < numberOfWorkers * 2; i++) { promises.push(pool.execute()) } - for (const value of pool.workers.values()) { - expect(value.tasksUsage).toBeDefined() - expect(value.tasksUsage.run).toBe(0) - expect(value.tasksUsage.running).toBe(numberOfWorkers * 2) - expect(value.tasksUsage.runTime).toBe(0) - expect(value.tasksUsage.avgRunTime).toBe(0) + for (const workerItem of pool.workers) { + expect(workerItem.tasksUsage).toBeDefined() + expect(workerItem.tasksUsage.run).toBe(0) + expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2) + expect(workerItem.tasksUsage.runTime).toBe(0) + expect(workerItem.tasksUsage.avgRunTime).toBe(0) } await Promise.all(promises) - for (const value of pool.workers.values()) { - expect(value.tasksUsage).toBeDefined() - expect(value.tasksUsage.run).toBe(numberOfWorkers * 2) - expect(value.tasksUsage.running).toBe(0) - expect(value.tasksUsage.runTime).toBeGreaterThanOrEqual(0) - expect(value.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + for (const workerItem of pool.workers) { + expect(workerItem.tasksUsage).toBeDefined() + expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2) + expect(workerItem.tasksUsage.running).toBe(0) + expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) } await pool.destroy() }) @@ -187,20 +187,20 @@ describe('Abstract pool test suite', () => { promises.push(pool.execute()) } await Promise.all(promises) - for (const value of pool.workers.values()) { - expect(value.tasksUsage).toBeDefined() - expect(value.tasksUsage.run).toBe(numberOfWorkers * 2) - expect(value.tasksUsage.running).toBe(0) - expect(value.tasksUsage.runTime).toBeGreaterThanOrEqual(0) - expect(value.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + for (const workerItem of pool.workers) { + expect(workerItem.tasksUsage).toBeDefined() + expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2) + expect(workerItem.tasksUsage.running).toBe(0) + expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const value of pool.workers.values()) { - expect(value.tasksUsage).toBeDefined() - expect(value.tasksUsage.run).toBe(0) - expect(value.tasksUsage.running).toBe(0) - expect(value.tasksUsage.runTime).toBe(0) - expect(value.tasksUsage.avgRunTime).toBe(0) + for (const workerItem of pool.workers) { + expect(workerItem.tasksUsage).toBeDefined() + expect(workerItem.tasksUsage.run).toBe(0) + expect(workerItem.tasksUsage.running).toBe(0) + expect(workerItem.tasksUsage.runTime).toBe(0) + expect(workerItem.tasksUsage.avgRunTime).toBe(0) } await pool.destroy() }) diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 7a834ca2..ddf42501 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -32,8 +32,8 @@ describe('Dynamic cluster pool test suite', () => { for (let i = 0; i < max * 2; i++) { pool.execute() } - expect(pool.workers.size).toBeLessThanOrEqual(max) - expect(pool.workers.size).toBeGreaterThan(min) + expect(pool.workers.length).toBeLessThanOrEqual(max) + expect(pool.workers.length).toBeGreaterThan(min) // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolBusy).toBe(max + 1) @@ -42,19 +42,19 @@ describe('Dynamic cluster pool test suite', () => { }) it('Verify scale worker up and down is working', async () => { - expect(pool.workers.size).toBe(min) + expect(pool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.size).toBeGreaterThan(min) + expect(pool.workers.length).toBeGreaterThan(min) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.size).toBe(min) + expect(pool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.size).toBeGreaterThan(min) + expect(pool.workers.length).toBeGreaterThan(min) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.size).toBe(min) + expect(pool.workers.length).toBe(min) }) it('Shutdown test', async () => { @@ -93,14 +93,14 @@ describe('Dynamic cluster pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.size).toBe(min) + expect(longRunningPool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.size).toBe(max) + expect(longRunningPool.workers.length).toBe(max) await TestUtils.waitExits(longRunningPool, max - min) // Here we expect the workers to be at the max size since that the task is still running - expect(longRunningPool.workers.size).toBe(min) + expect(longRunningPool.workers.length).toBe(min) // We need to clean up the resources after our test await longRunningPool.destroy() }) @@ -116,14 +116,14 @@ describe('Dynamic cluster pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.size).toBe(min) + expect(longRunningPool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.size).toBe(max) + expect(longRunningPool.workers.length).toBe(max) await TestUtils.sleep(1500) // Here we expect the workers to be at the max size since that the task is still running - expect(longRunningPool.workers.size).toBe(max) + expect(longRunningPool.workers.length).toBe(max) // We need to clean up the resources after our test await longRunningPool.destroy() }) diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index b544452f..9036642f 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -32,8 +32,8 @@ describe('Dynamic thread pool test suite', () => { for (let i = 0; i < max * 2; i++) { pool.execute() } - expect(pool.workers.size).toBeLessThanOrEqual(max) - expect(pool.workers.size).toBeGreaterThan(min) + expect(pool.workers.length).toBeLessThanOrEqual(max) + expect(pool.workers.length).toBeGreaterThan(min) // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolBusy).toBe(max + 1) @@ -42,19 +42,19 @@ describe('Dynamic thread pool test suite', () => { }) it('Verify scale thread up and down is working', async () => { - expect(pool.workers.size).toBe(min) + expect(pool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.size).toBe(max) + expect(pool.workers.length).toBe(max) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.size).toBe(min) + expect(pool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.size).toBe(max) + expect(pool.workers.length).toBe(max) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.size).toBe(min) + expect(pool.workers.length).toBe(min) }) it('Shutdown test', async () => { @@ -93,13 +93,13 @@ describe('Dynamic thread pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.size).toBe(min) + expect(longRunningPool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.size).toBe(max) + expect(longRunningPool.workers.length).toBe(max) await TestUtils.waitExits(longRunningPool, max - min) - expect(longRunningPool.workers.size).toBe(min) + expect(longRunningPool.workers.length).toBe(min) // We need to clean up the resources after our test await longRunningPool.destroy() }) @@ -115,14 +115,14 @@ describe('Dynamic thread pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.size).toBe(min) + expect(longRunningPool.workers.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.size).toBe(max) + expect(longRunningPool.workers.length).toBe(max) await TestUtils.sleep(1500) // Here we expect the workers to be at the max size since that the task is still running - expect(longRunningPool.workers.size).toBe(max) + expect(longRunningPool.workers.length).toBe(max) // We need to clean up the resources after our test await longRunningPool.destroy() }) diff --git a/tests/test-utils.js b/tests/test-utils.js index 7314f261..6d3592c1 100644 --- a/tests/test-utils.js +++ b/tests/test-utils.js @@ -4,8 +4,8 @@ class TestUtils { static async waitExits (pool, numberOfExitEventsToWait) { return new Promise(resolve => { let exitEvents = 0 - for (const value of pool.workers.values()) { - value.worker.on('exit', () => { + for (const workerItem of pool.workers) { + workerItem.worker.on('exit', () => { ++exitEvents if (exitEvents === numberOfExitEventsToWait) { resolve(exitEvents) -- 2.34.1