From c923ce5670eeae4194aa996d44a1071e88cb21ad Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 3 Apr 2023 17:13:07 +0200 Subject: [PATCH] perf: use worker key as much as possible instead of a reference to the worker instance MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 92 ++++++++++++------- src/pools/pool-internal.ts | 16 +--- .../abstract-worker-choice-strategy.ts | 4 +- .../dynamic-pool-worker-choice-strategy.ts | 18 ++-- .../fair-share-worker-choice-strategy.ts | 29 +++--- .../less-busy-worker-choice-strategy.ts | 16 ++-- .../less-used-worker-choice-strategy.ts | 18 ++-- .../round-robin-worker-choice-strategy.ts | 6 +- .../selection-strategies-types.ts | 10 +- .../selection-strategies-utils.ts | 14 +-- ...hted-round-robin-worker-choice-strategy.ts | 44 ++++----- .../worker-choice-strategy-context.ts | 16 ++-- src/utility-types.ts | 10 +- tests/pools/abstract/abstract-pool.test.js | 7 +- .../selection-strategies.test.js | 38 ++++---- .../worker-choice-strategy-context.test.js | 12 +-- 16 files changed, 183 insertions(+), 167 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 62def61e..172feda0 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -35,12 +35,14 @@ export abstract class AbstractPool< * The promise response map. * * - `key`: The message id of each submitted task. - * - `value`: An object that contains the worker key, the promise resolve and reject callbacks. + * - `value`: An object that contains the worker, the promise resolve and reject callbacks. * * When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message. */ - protected promiseResponseMap: Map> = - new Map>() + protected promiseResponseMap: Map< + string, + PromiseResponseWrapper + > = new Map>() /** * Worker choice strategy instance implementing the worker choice algorithm. @@ -83,17 +85,17 @@ export abstract class AbstractPool< this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext( this, () => { - const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { + const createdWorker = this.createAndSetupWorker() + this.registerWorkerMessageListener(createdWorker, message => { if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - this.getWorkerTasksUsage(workerCreated)?.running === 0 + this.getWorkerTasksUsage(createdWorker)?.running === 0 ) { // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void this.destroyWorker(workerCreated) + void this.destroyWorker(createdWorker) } }) - return workerCreated + return this.getWorkerKey(createdWorker) }, this.opts.workerChoiceStrategy ) @@ -155,8 +157,8 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy ): void { this.opts.workerChoiceStrategy = workerChoiceStrategy - for (const workerItem of this.workers) { - this.setWorker(workerItem.worker, { + for (const [index, workerItem] of this.workers.entries()) { + this.setWorker(index, workerItem.worker, { run: 0, running: 0, runTime: 0, @@ -175,26 +177,23 @@ export abstract class AbstractPool< protected internalGetBusyStatus (): boolean { return ( this.numberOfRunningTasks >= this.numberOfWorkers && - this.findFreeWorker() === false + this.findFreeWorkerKey() === false ) } /** {@inheritDoc} */ - public findFreeWorker (): Worker | false { - for (const workerItem of this.workers) { - if (workerItem.tasksUsage.running === 0) { - // A worker is free, return the matching worker - return workerItem.worker - } - } - return false + public findFreeWorkerKey (): number | false { + const freeWorkerKey = this.workers.findIndex(workerItem => { + return workerItem.tasksUsage.running === 0 + }) + return freeWorkerKey !== -1 ? freeWorkerKey : false } /** {@inheritDoc} */ public async execute (data: Data): Promise { - const worker = this.chooseWorker() + const [workerKey, worker] = this.chooseWorker() const messageId = crypto.randomUUID() - const res = this.internalExecute(this.getWorkerKey(worker), messageId) + const res = this.internalExecute(workerKey, worker, messageId) this.checkAndEmitBusy() this.sendToWorker(worker, { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -248,14 +247,14 @@ export abstract class AbstractPool< * Hook executed after the worker task promise resolution. * Can be overridden. * - * @param workerKey - The worker key. + * @param worker - The worker. * @param message - The received message. */ protected afterPromiseResponseHook ( - workerKey: number, + worker: Worker, message: MessageValue ): void { - const workerTasksUsage = this.workers[workerKey].tasksUsage + const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage --workerTasksUsage.running ++workerTasksUsage.run if (message.error != null) { @@ -287,10 +286,11 @@ export abstract class AbstractPool< * * The default implementation uses a round robin algorithm to distribute the load. * - * @returns Worker. + * @returns [worker key, worker]. */ - protected chooseWorker (): Worker { - return this.workerChoiceStrategyContext.execute() + protected chooseWorker (): [number, Worker] { + const workerKey = this.workerChoiceStrategyContext.execute() + return [workerKey, this.workers[workerKey].worker] } /** @@ -344,7 +344,7 @@ export abstract class AbstractPool< this.removeWorker(worker) }) - this.setWorker(worker, { + this.pushWorker(worker, { run: 0, running: 0, runTime: 0, @@ -372,7 +372,7 @@ export abstract class AbstractPool< } else { promiseResponse.resolve(message.data as Response) } - this.afterPromiseResponseHook(promiseResponse.workerKey, message) + this.afterPromiseResponseHook(promiseResponse.worker, message) this.promiseResponseMap.delete(message.id) } } @@ -381,11 +381,12 @@ export abstract class AbstractPool< private async internalExecute ( workerKey: number, + worker: Worker, messageId: string ): Promise { this.beforePromiseResponseHook(workerKey) return await new Promise((resolve, reject) => { - this.promiseResponseMap.set(messageId, { resolve, reject, workerKey }) + this.promiseResponseMap.set(messageId, { resolve, reject, worker }) }) } @@ -395,8 +396,13 @@ export abstract class AbstractPool< } } - /** {@inheritDoc} */ - public getWorkerTasksUsage (worker: Worker): TasksUsage | undefined { + /** + * Gets worker tasks usage. + * + * @param worker - The worker. + * @returns The worker tasks usage. + */ + private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined { const workerKey = this.getWorkerKey(worker) if (workerKey !== -1) { return this.workers[workerKey].tasksUsage @@ -405,15 +411,33 @@ export abstract class AbstractPool< } /** - * Sets the given worker. + * Pushes the given worker. * * @param worker - The worker. * @param tasksUsage - The worker tasks usage. */ - private setWorker (worker: Worker, tasksUsage: TasksUsage): void { + private pushWorker (worker: Worker, tasksUsage: TasksUsage): void { this.workers.push({ worker, tasksUsage }) } + + /** + * Sets the given worker. + * + * @param workerKey - The worker key. + * @param worker - The worker. + * @param tasksUsage - The worker tasks usage. + */ + private setWorker ( + workerKey: number, + worker: Worker, + tasksUsage: TasksUsage + ): void { + this.workers[workerKey] = { + worker, + tasksUsage + } + } } diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index bd75cf4f..261a9d7b 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -67,21 +67,13 @@ export interface IPoolInternal< readonly numberOfRunningTasks: number /** - * Finds a free worker based on the number of tasks the worker has applied. + * Finds a free worker key based on the number of tasks the worker has applied. * - * If a worker is found with `0` running tasks, it is detected as free and returned. + * If a worker is found with `0` running tasks, it is detected as free and its key is returned. * * If no free worker is found, `false` is returned. * - * @returns A free worker if there is one, otherwise `false`. + * @returns A worker key if there is one, otherwise `false`. */ - findFreeWorker: () => Worker | false - - /** - * Gets worker tasks usage. - * - * @param worker - The worker. - * @returns The tasks usage on the worker. - */ - getWorkerTasksUsage: (worker: Worker) => TasksUsage | undefined + findFreeWorkerKey: () => number | false } diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index ebe74dcb..5b526331 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -17,7 +17,7 @@ export abstract class AbstractWorkerChoiceStrategy< Worker extends IPoolWorker, Data, Response -> implements IWorkerChoiceStrategy { +> implements IWorkerChoiceStrategy { /** {@inheritDoc} */ public readonly isDynamicPool: boolean /** {@inheritDoc} */ @@ -40,5 +40,5 @@ export abstract class AbstractWorkerChoiceStrategy< public abstract reset (): boolean /** {@inheritDoc} */ - public abstract choose (): Worker + public abstract choose (): number } diff --git a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts index 5f07d607..c122c2b1 100644 --- a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts @@ -20,18 +20,18 @@ export class DynamicPoolWorkerChoiceStrategy< Data, Response > extends AbstractWorkerChoiceStrategy { - private readonly workerChoiceStrategy: IWorkerChoiceStrategy + private readonly workerChoiceStrategy: IWorkerChoiceStrategy /** * Constructs a worker choice strategy for dynamic pool. * * @param pool - The pool instance. - * @param createDynamicallyWorkerCallback - The worker creation callback for dynamic pool. - * @param workerChoiceStrategy - The worker choice strategy when the pull is busy. + * @param createWorkerCallback - The worker creation callback for dynamic pool. + * @param workerChoiceStrategy - The worker choice strategy when the pool is busy. */ public constructor ( pool: IPoolInternal, - private readonly createDynamicallyWorkerCallback: () => Worker, + private readonly createWorkerCallback: () => number, workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN ) { super(pool) @@ -48,10 +48,10 @@ export class DynamicPoolWorkerChoiceStrategy< } /** {@inheritDoc} */ - public choose (): Worker { - const freeWorker = this.pool.findFreeWorker() - if (freeWorker !== false) { - return freeWorker + public choose (): number { + const freeWorkerKey = this.pool.findFreeWorkerKey() + if (freeWorkerKey !== false) { + return freeWorkerKey } if (this.pool.busy) { @@ -59,6 +59,6 @@ export class DynamicPoolWorkerChoiceStrategy< } // All workers are busy, create a new worker - return this.createDynamicallyWorkerCallback() + return this.createWorkerCallback() } } diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 48198371..af8eb75d 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -32,9 +32,9 @@ export class FairShareWorkerChoiceStrategy< * Worker last virtual task execution timestamp. */ private readonly workerLastVirtualTaskTimestamp: Map< - Worker, + number, WorkerVirtualTaskTimestamp - > = new Map() + > = new Map() /** {@inheritDoc} */ public reset (): boolean { @@ -43,39 +43,38 @@ export class FairShareWorkerChoiceStrategy< } /** {@inheritDoc} */ - public choose (): Worker { + public choose (): number { let minWorkerVirtualTaskEndTimestamp = Infinity - let chosenWorker!: Worker - for (const workerItem of this.pool.workers) { - const worker = workerItem.worker - this.computeWorkerLastVirtualTaskTimestamp(worker) + let chosenWorkerKey!: number + for (const [index] of this.pool.workers.entries()) { + this.computeWorkerLastVirtualTaskTimestamp(index) const workerLastVirtualTaskEndTimestamp = - this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? 0 + this.workerLastVirtualTaskTimestamp.get(index)?.end ?? 0 if ( workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp ) { minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp - chosenWorker = worker + chosenWorkerKey = index } } - return chosenWorker + return chosenWorkerKey } /** * Computes worker last virtual task timestamp. * - * @param worker - The worker. + * @param workerKey - The worker key. */ - private computeWorkerLastVirtualTaskTimestamp (worker: Worker): void { + private computeWorkerLastVirtualTaskTimestamp (workerKey: number): void { const workerVirtualTaskStartTimestamp = Math.max( Date.now(), - this.workerLastVirtualTaskTimestamp.get(worker)?.end ?? -Infinity + this.workerLastVirtualTaskTimestamp.get(workerKey)?.end ?? -Infinity ) - this.workerLastVirtualTaskTimestamp.set(worker, { + this.workerLastVirtualTaskTimestamp.set(workerKey, { start: workerVirtualTaskStartTimestamp, end: workerVirtualTaskStartTimestamp + - (this.pool.getWorkerTasksUsage(worker)?.avgRunTime ?? 0) + (this.pool.workers[workerKey].tasksUsage.avgRunTime ?? 0) }) } } diff --git a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts index 31e7881a..2bbafb59 100644 --- a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts @@ -25,20 +25,18 @@ export class LessBusyWorkerChoiceStrategy< } /** {@inheritDoc} */ - public choose (): Worker { + public choose (): number { let minRunTime = Infinity - let lessBusyWorker!: Worker - for (const workerItem of this.pool.workers) { - const worker = workerItem.worker - const workerRunTime = this.pool.getWorkerTasksUsage(worker) - ?.runTime as number + let lessBusyWorkerKey!: number + for (const [index, workerItem] of this.pool.workers.entries()) { + const workerRunTime = workerItem.tasksUsage.runTime if (!this.isDynamicPool && workerRunTime === 0) { - return worker + return index } else if (workerRunTime < minRunTime) { minRunTime = workerRunTime - lessBusyWorker = worker + lessBusyWorkerKey = index } } - return lessBusyWorker + return lessBusyWorkerKey } } diff --git a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts index 833e605b..2ed58c96 100644 --- a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts @@ -19,21 +19,19 @@ export class LessUsedWorkerChoiceStrategy< } /** {@inheritDoc} */ - public choose (): Worker { + public choose (): number { let minNumberOfTasks = Infinity - let lessUsedWorker!: Worker - for (const workerItem of this.pool.workers) { - const worker = workerItem.worker - const tasksUsage = this.pool.getWorkerTasksUsage(worker) - const workerTasks = - (tasksUsage?.run as number) + (tasksUsage?.running as number) + let lessUsedWorkerKey!: number + for (const [index, workerItem] of this.pool.workers.entries()) { + const tasksUsage = workerItem.tasksUsage + const workerTasks = tasksUsage?.run + tasksUsage?.running if (!this.isDynamicPool && workerTasks === 0) { - return worker + return index } else if (workerTasks < minNumberOfTasks) { minNumberOfTasks = workerTasks - lessUsedWorker = worker + lessUsedWorkerKey = index } } - return lessUsedWorker + return lessUsedWorkerKey } } diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 9bf49a3c..e265172f 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -25,12 +25,12 @@ export class RoundRobinWorkerChoiceStrategy< } /** {@inheritDoc} */ - public choose (): Worker { - const chosenWorker = this.pool.workers[this.nextWorkerId].worker + public choose (): number { + const chosenWorkerKey = this.nextWorkerId this.nextWorkerId = this.nextWorkerId === this.pool.workers.length - 1 ? 0 : this.nextWorkerId + 1 - return chosenWorker + return chosenWorkerKey } } diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 9ae66765..a05fd9b7 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -1,5 +1,3 @@ -import type { IPoolWorker } from '../pool-worker' - /** * Enumeration of worker choice strategies. */ @@ -40,10 +38,8 @@ export interface RequiredStatistics { /** * Worker choice strategy interface. - * - * @typeParam Worker - Type of worker which manages the strategy. */ -export interface IWorkerChoiceStrategy { +export interface IWorkerChoiceStrategy { /** * Is the pool attached to the strategy dynamic?. */ @@ -57,7 +53,7 @@ export interface IWorkerChoiceStrategy { */ reset: () => boolean /** - * Chooses a worker in the pool. + * Chooses a worker in the pool and returns its key. */ - choose: () => Worker + choose: () => number } diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts index 1330ec19..10c93335 100644 --- a/src/pools/selection-strategies/selection-strategies-utils.ts +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -25,18 +25,20 @@ export function getWorkerChoiceStrategy< > ( pool: IPoolInternal, workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN -): IWorkerChoiceStrategy { +): IWorkerChoiceStrategy { switch (workerChoiceStrategy) { case WorkerChoiceStrategies.ROUND_ROBIN: - return new RoundRobinWorkerChoiceStrategy(pool) + return new RoundRobinWorkerChoiceStrategy(pool) case WorkerChoiceStrategies.LESS_USED: - return new LessUsedWorkerChoiceStrategy(pool) + return new LessUsedWorkerChoiceStrategy(pool) case WorkerChoiceStrategies.LESS_BUSY: - return new LessBusyWorkerChoiceStrategy(pool) + return new LessBusyWorkerChoiceStrategy(pool) case WorkerChoiceStrategies.FAIR_SHARE: - return new FairShareWorkerChoiceStrategy(pool) + return new FairShareWorkerChoiceStrategy(pool) case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN: - return new WeightedRoundRobinWorkerChoiceStrategy(pool) + return new WeightedRoundRobinWorkerChoiceStrategy( + pool + ) default: throw new Error( // eslint-disable-next-line @typescript-eslint/restrict-template-expressions diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 5c5e6474..e3d2be3f 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -41,8 +41,8 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** * Per worker virtual task runtime map. */ - private readonly workersTaskRunTime: Map = new Map< - Worker, + private readonly workersTaskRunTime: Map = new Map< + number, TaskRunTime >() @@ -66,60 +66,56 @@ export class WeightedRoundRobinWorkerChoiceStrategy< } /** {@inheritDoc} */ - public choose (): Worker { - const chosenWorker = this.pool.workers[this.currentWorkerId].worker - if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorker)) { - this.initWorkerTaskRunTime(chosenWorker) + public choose (): number { + const chosenWorkerKey = this.currentWorkerId + if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) { + this.initWorkerTaskRunTime(chosenWorkerKey) } const workerTaskRunTime = - this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0 + this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0 const workerTaskWeight = - this.workersTaskRunTime.get(chosenWorker)?.weight ?? + this.workersTaskRunTime.get(chosenWorkerKey)?.weight ?? this.defaultWorkerWeight if (workerTaskRunTime < workerTaskWeight) { this.setWorkerTaskRunTime( - chosenWorker, + chosenWorkerKey, workerTaskWeight, workerTaskRunTime + - (this.getWorkerVirtualTaskRunTime(chosenWorker) ?? 0) + (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0) ) } else { this.currentWorkerId = this.currentWorkerId === this.pool.workers.length - 1 ? 0 : this.currentWorkerId + 1 - this.setWorkerTaskRunTime( - this.pool.workers[this.currentWorkerId].worker, - workerTaskWeight, - 0 - ) + this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0) } - return chosenWorker + return chosenWorkerKey } private initWorkersTaskRunTime (): void { - for (const workerItem of this.pool.workers) { - this.initWorkerTaskRunTime(workerItem.worker) + for (const [index] of this.pool.workers.entries()) { + this.initWorkerTaskRunTime(index) } } - private initWorkerTaskRunTime (worker: Worker): void { - this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0) + private initWorkerTaskRunTime (workerKey: number): void { + this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0) } private setWorkerTaskRunTime ( - worker: Worker, + workerKey: number, weight: number, runTime: number ): void { - this.workersTaskRunTime.set(worker, { + this.workersTaskRunTime.set(workerKey, { weight, runTime }) } - private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined { - return this.pool.getWorkerTasksUsage(worker)?.avgRunTime + private getWorkerVirtualTaskRunTime (workerKey: number): number { + return this.pool.workers[workerKey].tasksUsage.avgRunTime } private computeWorkerWeight (): number { diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 1bba1a55..c149cc2e 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -21,18 +21,18 @@ export class WorkerChoiceStrategyContext< Data, Response > { - private workerChoiceStrategy!: IWorkerChoiceStrategy + private workerChoiceStrategy!: IWorkerChoiceStrategy /** * Worker choice strategy context constructor. * * @param pool - The pool instance. - * @param createDynamicallyWorkerCallback - The worker creation callback for dynamic pool. + * @param createWorkerCallback - The worker creation callback for dynamic pool. * @param workerChoiceStrategy - The worker choice strategy. */ public constructor ( private readonly pool: IPoolInternal, - private readonly createDynamicallyWorkerCallback: () => Worker, + private readonly createWorkerCallback: () => number, workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN ) { this.setWorkerChoiceStrategy(workerChoiceStrategy) @@ -46,11 +46,11 @@ export class WorkerChoiceStrategyContext< */ private getPoolWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN - ): IWorkerChoiceStrategy { + ): IWorkerChoiceStrategy { if (this.pool.type === PoolType.DYNAMIC) { return new DynamicPoolWorkerChoiceStrategy( this.pool, - this.createDynamicallyWorkerCallback, + this.createWorkerCallback, workerChoiceStrategy ) } @@ -62,7 +62,7 @@ export class WorkerChoiceStrategyContext< * * @returns The worker choice strategy. */ - public getWorkerChoiceStrategy (): IWorkerChoiceStrategy { + public getWorkerChoiceStrategy (): IWorkerChoiceStrategy { return this.workerChoiceStrategy } @@ -82,9 +82,9 @@ export class WorkerChoiceStrategyContext< /** * Chooses a worker with the underlying selection strategy. * - * @returns The chosen one. + * @returns The key of the chosen one. */ - public execute (): Worker { + public execute (): number { return this.workerChoiceStrategy.choose() } } diff --git a/src/utility-types.ts b/src/utility-types.ts index 54bb7461..0fdaa36e 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,6 +1,7 @@ import type { Worker as ClusterWorker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import type { KillBehavior } from './worker/worker-options' +import type { IPoolWorker } from './pools/pool-worker' /** * Make all properties in T non-readonly. @@ -47,7 +48,10 @@ export interface MessageValue< * * @typeParam Response - Type of execution response. This can only be serializable data. */ -export interface PromiseResponseWrapper { +export interface PromiseResponseWrapper< + Worker extends IPoolWorker, + Response = unknown +> { /** * Resolve callback to fulfill the promise. */ @@ -57,7 +61,7 @@ export interface PromiseResponseWrapper { */ readonly reject: (reason?: string) => void /** - * The worker handling the promise key . + * The worker handling the promise. */ - readonly workerKey: number + readonly worker: Worker } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index a87c8934..5bb5cde5 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -14,7 +14,7 @@ describe('Abstract pool test suite', () => { class StubPoolWithRemoveAllWorker extends FixedThreadPool { removeAllWorker () { this.workers = [] - this.promiseMap.clear() + this.promiseResponseMap.clear() } } class StubPoolWithIsMain extends FixedThreadPool { @@ -145,6 +145,7 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBe(0) expect(workerItem.tasksUsage.avgRunTime).toBe(0) + expect(workerItem.tasksUsage.error).toBe(0) } await pool.destroy() }) @@ -164,6 +165,7 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2) expect(workerItem.tasksUsage.runTime).toBe(0) expect(workerItem.tasksUsage.avgRunTime).toBe(0) + expect(workerItem.tasksUsage.error).toBe(0) } await Promise.all(promises) for (const workerItem of pool.workers) { @@ -172,6 +174,7 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.error).toBe(0) } await pool.destroy() }) @@ -193,6 +196,7 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.error).toBe(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) for (const workerItem of pool.workers) { @@ -201,6 +205,7 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBe(0) expect(workerItem.tasksUsage.avgRunTime).toBe(0) + expect(workerItem.tasksUsage.error).toBe(0) } await pool.destroy() }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index ac3e2720..a6257179 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -134,14 +134,14 @@ describe('Selection strategies test suite', () => { ) let results = new Set() for (let i = 0; i < max; i++) { - results.add(pool.chooseWorker().id) + results.add(pool.chooseWorker()[1].id) } expect(results.size).toBe(max) await pool.destroy() pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js') results = new Set() for (let i = 0; i < max; i++) { - results.add(pool.chooseWorker().threadId) + results.add(pool.chooseWorker()[1].threadId) } expect(results.size).toBe(max) await pool.destroy() @@ -357,18 +357,18 @@ describe('Selection strategies test suite', () => { expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.FAIR_SHARE ) - for (const worker of pool.workerChoiceStrategyContext + for (const workerKey of pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() .workerLastVirtualTaskTimestamp.keys()) { expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workerLastVirtualTaskTimestamp.get(worker).start + .workerLastVirtualTaskTimestamp.get(workerKey).start ).toBe(0) expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workerLastVirtualTaskTimestamp.get(worker).end + .workerLastVirtualTaskTimestamp.get(workerKey).end ).toBe(0) } // We need to clean up the resources after our test @@ -456,18 +456,18 @@ describe('Selection strategies test suite', () => { .workerLastVirtualTaskTimestamp ).toBeUndefined() pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const worker of pool.workerChoiceStrategyContext + for (const workerKey of pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() .workerLastVirtualTaskTimestamp.keys()) { expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workerLastVirtualTaskTimestamp.get(worker).start + .workerLastVirtualTaskTimestamp.get(workerKey).start ).toBe(0) expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workerLastVirtualTaskTimestamp.get(worker).end + .workerLastVirtualTaskTimestamp.get(workerKey).end ).toBe(0) } await pool.destroy() @@ -481,18 +481,20 @@ describe('Selection strategies test suite', () => { .workerChoiceStrategy.workerLastVirtualTaskTimestamp ).toBeUndefined() pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const worker of pool.workerChoiceStrategyContext + for (const workerKey of pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() .workerChoiceStrategy.workerLastVirtualTaskTimestamp.keys()) { expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(worker).start + .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(workerKey) + .start ).toBe(0) expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(worker).end + .workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(workerKey) + .end ).toBe(0) } // We need to clean up the resources after our test @@ -515,18 +517,18 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() .defaultWorkerWeight ).toBeGreaterThan(0) - for (const worker of pool.workerChoiceStrategyContext + for (const workerKey of pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() .workersTaskRunTime.keys()) { expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workersTaskRunTime.get(worker).weight + .workersTaskRunTime.get(workerKey).weight ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workersTaskRunTime.get(worker).runTime + .workersTaskRunTime.get(workerKey).runTime ).toBe(0) } // We need to clean up the resources after our test @@ -628,13 +630,13 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() .defaultWorkerWeight ).toBeGreaterThan(0) - for (const worker of pool.workerChoiceStrategyContext + for (const workerKey of pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() .workersTaskRunTime.keys()) { expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workersTaskRunTime.get(worker).runTime + .workersTaskRunTime.get(workerKey).runTime ).toBe(0) } await pool.destroy() @@ -664,13 +666,13 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() .workerChoiceStrategy.defaultWorkerWeight ).toBeGreaterThan(0) - for (const worker of pool.workerChoiceStrategyContext + for (const workerKey of pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() .workerChoiceStrategy.workersTaskRunTime.keys()) { expect( pool.workerChoiceStrategyContext .getWorkerChoiceStrategy() - .workerChoiceStrategy.workersTaskRunTime.get(worker).runTime + .workerChoiceStrategy.workersTaskRunTime.get(workerKey).runTime ).toBe(0) } // We need to clean up the resources after our test diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js index 9380a038..9ab2babf 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js @@ -60,15 +60,15 @@ describe('Worker choice strategy context test suite', () => { const WorkerChoiceStrategyStub = sinon.createStubInstance( RoundRobinWorkerChoiceStrategy, { - choose: sinon.stub().returns('worker') + choose: sinon.stub().returns(0) } ) workerChoiceStrategyContext.workerChoiceStrategy = WorkerChoiceStrategyStub - const chosenWorker = workerChoiceStrategyContext.execute() + const chosenWorkerKey = workerChoiceStrategyContext.execute() expect( workerChoiceStrategyContext.getWorkerChoiceStrategy().choose.calledOnce ).toBe(true) - expect(chosenWorker).toBe('worker') + expect(chosenWorkerKey).toBe(0) }) it('Verify that execute() return the worker chosen by the strategy with dynamic pool', () => { @@ -78,15 +78,15 @@ describe('Worker choice strategy context test suite', () => { const WorkerChoiceStrategyStub = sinon.createStubInstance( RoundRobinWorkerChoiceStrategy, { - choose: sinon.stub().returns('worker') + choose: sinon.stub().returns(0) } ) workerChoiceStrategyContext.workerChoiceStrategy = WorkerChoiceStrategyStub - const chosenWorker = workerChoiceStrategyContext.execute() + const chosenWorkerKey = workerChoiceStrategyContext.execute() expect( workerChoiceStrategyContext.getWorkerChoiceStrategy().choose.calledOnce ).toBe(true) - expect(chosenWorker).toBe('worker') + expect(chosenWorkerKey).toBe(0) }) it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => { -- 2.34.1