From f59e102739e13698f278f1d9d58ab26ed8150442 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 30 Jun 2023 22:31:57 +0200 Subject: [PATCH 1/1] feat: add worker info to worker nodes MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 71 +++++---- .../abstract-worker-choice-strategy.ts | 20 ++- .../least-busy-worker-choice-strategy.ts | 3 +- .../least-elu-worker-choice-strategy.ts | 2 +- .../least-used-worker-choice-strategy.ts | 2 +- src/pools/worker.ts | 27 +++- src/utility-types.ts | 10 +- src/worker/abstract-worker.ts | 8 ++ src/worker/cluster-worker.ts | 9 ++ src/worker/thread-worker.ts | 15 +- tests/pools/abstract/abstract-pool.test.js | 30 ++-- tests/pools/cluster/fixed.test.js | 24 ++-- .../selection-strategies.test.js | 136 +++++++++--------- tests/pools/thread/fixed.test.js | 26 ++-- 14 files changed, 231 insertions(+), 152 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index da0f2e77..6fde2491 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -236,6 +236,14 @@ export abstract class AbstractPool< } } + private get starting (): boolean { + return this.workerNodes.some(workerNode => !workerNode.info.started) + } + + private get started (): boolean { + return this.workerNodes.some(workerNode => workerNode.info.started) + } + /** @inheritDoc */ public get info (): PoolInfo { return { @@ -246,41 +254,39 @@ export abstract class AbstractPool< workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => - workerNode.workerUsage.tasks.executing === 0 + workerNode.usage.tasks.executing === 0 ? accumulator + 1 : accumulator, 0 ), busyWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => - workerNode.workerUsage.tasks.executing > 0 - ? accumulator + 1 - : accumulator, + workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.executed, + accumulator + workerNode.usage.tasks.executed, 0 ), executingTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.executing, + accumulator + workerNode.usage.tasks.executing, 0 ), queuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.queued, + accumulator + workerNode.usage.tasks.queued, 0 ), maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.maxQueued, + accumulator + workerNode.usage.tasks.maxQueued, 0 ), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.workerUsage.tasks.failed, + accumulator + workerNode.usage.tasks.failed, 0 ) } @@ -308,11 +314,22 @@ export abstract class AbstractPool< */ protected abstract get maxSize (): number + /** + * Get the worker given its id. + * + * @param workerId - The worker id. + * @returns The worker if found in the pool worker nodes, `undefined` otherwise. + */ + private getWorkerById (workerId: number): Worker | undefined { + return this.workerNodes.find(workerNode => workerNode.info.id === workerId) + ?.worker + } + /** * Gets the given worker its worker node key. * * @param worker - The worker. - * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise. + * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ private getWorkerNodeKey (worker: Worker): number { return this.workerNodes.findIndex( @@ -408,7 +425,7 @@ export abstract class AbstractPool< protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { - return workerNode.workerUsage.tasks.executing === 0 + return workerNode.usage.tasks.executing === 0 }) === -1 ) } @@ -434,7 +451,7 @@ export abstract class AbstractPool< if ( this.opts.enableTasksQueue === true && (this.busy || - this.workerNodes[workerNodeKey].workerUsage.tasks.executing >= + this.workerNodes[workerNodeKey].usage.tasks.executing >= ((this.opts.tasksQueueOptions as TasksQueueOptions) .concurrency as number)) ) { @@ -491,7 +508,7 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { - const workerUsage = this.workerNodes[workerNodeKey].workerUsage + const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) } @@ -507,8 +524,7 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerUsage = - this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage + const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) @@ -715,7 +731,7 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - if (this.opts.restartWorkerOnError === true) { + if (this.opts.restartWorkerOnError === true && !this.starting) { this.createAndSetupWorker() } }) @@ -747,11 +763,9 @@ export abstract class AbstractPool< isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && ((this.opts.enableTasksQueue === false && - this.workerNodes[workerNodeKey].workerUsage.tasks.executing === - 0) || + this.workerNodes[workerNodeKey].usage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].workerUsage.tasks.executing === - 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && this.tasksQueueSize(workerNodeKey) === 0))) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) @@ -768,7 +782,12 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.id != null) { + if (message.workerId != null && message.started != null) { + // Worker started message received + this.workerNodes[ + this.getWorkerNodeKey(this.getWorkerById(message.workerId) as Worker) + ].info.started = message.started + } else if (message.id != null) { // Task execution response received const promiseResponse = this.promiseResponseMap.get(message.id) if (promiseResponse != null) { @@ -819,7 +838,7 @@ export abstract class AbstractPool< workerNode: WorkerNode, workerUsage: WorkerUsage ): void { - workerNode.workerUsage = workerUsage + workerNode.usage = workerUsage } /** @@ -831,7 +850,8 @@ export abstract class AbstractPool< private pushWorkerNode (worker: Worker): number { this.workerNodes.push({ worker, - workerUsage: this.getWorkerUsage(), + info: { id: worker.threadId ?? worker.id, started: false }, + usage: this.getWorkerUsage(), tasksQueue: new Queue>() }) const workerNodeKey = this.getWorkerNodeKey(worker) @@ -847,18 +867,21 @@ export abstract class AbstractPool< // * // * @param workerNodeKey - The worker node key. // * @param worker - The worker. + // * @param workerInfo - The worker info. // * @param workerUsage - The worker usage. // * @param tasksQueue - The worker task queue. // */ // private setWorkerNode ( // workerNodeKey: number, // worker: Worker, + // workerInfo: WorkerInfo, // workerUsage: WorkerUsage, // tasksQueue: Queue> // ): void { // this.workerNodes[workerNodeKey] = { // worker, - // workerUsage, + // info: workerInfo, + // usage: workerUsage, // tasksQueue // } // } diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 37759198..631f286d 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -152,8 +152,8 @@ export abstract class AbstractWorkerChoiceStrategy< */ protected getWorkerTaskRunTime (workerNodeKey: number): number { return this.taskStatisticsRequirements.runTime.median - ? this.pool.workerNodes[workerNodeKey].workerUsage.runTime.median - : this.pool.workerNodes[workerNodeKey].workerUsage.runTime.average + ? this.pool.workerNodes[workerNodeKey].usage.runTime.median + : this.pool.workerNodes[workerNodeKey].usage.runTime.average } /** @@ -166,8 +166,8 @@ export abstract class AbstractWorkerChoiceStrategy< */ protected getWorkerTaskWaitTime (workerNodeKey: number): number { return this.taskStatisticsRequirements.waitTime.median - ? this.pool.workerNodes[workerNodeKey].workerUsage.waitTime.median - : this.pool.workerNodes[workerNodeKey].workerUsage.waitTime.average + ? this.pool.workerNodes[workerNodeKey].usage.waitTime.median + : this.pool.workerNodes[workerNodeKey].usage.waitTime.average } /** @@ -180,8 +180,8 @@ export abstract class AbstractWorkerChoiceStrategy< */ protected getWorkerTaskElu (workerNodeKey: number): number { return this.taskStatisticsRequirements.elu.median - ? this.pool.workerNodes[workerNodeKey].workerUsage.elu.active.median - : this.pool.workerNodes[workerNodeKey].workerUsage.elu.active.average + ? this.pool.workerNodes[workerNodeKey].usage.elu.active.median + : this.pool.workerNodes[workerNodeKey].usage.elu.active.average } protected computeDefaultWorkerWeight (): number { @@ -206,7 +206,7 @@ export abstract class AbstractWorkerChoiceStrategy< // */ // private findFirstFreeWorkerNodeKey (): number { // return this.pool.workerNodes.findIndex(workerNode => { - // return workerNode.workerUsage.tasks.executing === 0 + // return workerNode.usage.tasks.executing === 0 // }) // } @@ -222,16 +222,14 @@ export abstract class AbstractWorkerChoiceStrategy< // private findLastFreeWorkerNodeKey (): number { // // It requires node >= 18.0.0: // // return this.workerNodes.findLastIndex(workerNode => { - // // return workerNode.workerUsage.tasks.executing === 0 + // // return workerNode.usage.tasks.executing === 0 // // }) // for ( // let workerNodeKey = this.pool.workerNodes.length - 1; // workerNodeKey >= 0; // workerNodeKey-- // ) { - // if ( - // this.pool.workerNodes[workerNodeKey].workerUsage.tasks.executing === 0 - // ) { + // if (this.pool.workerNodes[workerNodeKey].usage.tasks.executing === 0) { // return workerNodeKey // } // } diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 7e9d5814..14aee44f 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -64,8 +64,7 @@ export class LeastBusyWorkerChoiceStrategy< let minTime = Infinity for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { const workerTime = - workerNode.workerUsage.runTime.aggregate + - workerNode.workerUsage.waitTime.aggregate + workerNode.usage.runTime.aggregate + workerNode.usage.waitTime.aggregate if (workerTime === 0) { this.nextWorkerNodeId = workerNodeKey break diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index 9c29a30a..cbe00a47 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -59,7 +59,7 @@ export class LeastEluWorkerChoiceStrategy< public choose (): number { let minWorkerElu = Infinity for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { - const workerUsage = workerNode.workerUsage + const workerUsage = workerNode.usage const workerElu = workerUsage.elu?.active.aggregate ?? 0 if (workerElu === 0) { this.nextWorkerNodeId = workerNodeKey diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index 5c266f28..53aa05ee 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -44,7 +44,7 @@ export class LeastUsedWorkerChoiceStrategy< public choose (): number { let minNumberOfTasks = Infinity for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { - const workerTaskStatistics = workerNode.workerUsage.tasks + const workerTaskStatistics = workerNode.usage.tasks const workerTasks = workerTaskStatistics.executed + workerTaskStatistics.executing + diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 07eabdfe..c92478d4 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -118,6 +118,22 @@ export interface TaskStatistics { failed: number } +/** + * Worker information. + * + * @internal + */ +export interface WorkerInfo { + /** + * Worker Id. + */ + id: number | undefined + /** + * Started flag. + */ + started: boolean +} + /** * Worker usage statistics. * @@ -146,6 +162,11 @@ export interface WorkerUsage { * Worker interface. */ export interface IWorker { + /** + * Worker Id. + */ + id?: number + threadId?: number /** * Register an event listener. * @@ -177,10 +198,14 @@ export interface WorkerNode { * Worker node worker. */ readonly worker: Worker + /** + * Worker node worker info. + */ + info: WorkerInfo /** * Worker node worker usage statistics. */ - workerUsage: WorkerUsage + usage: WorkerUsage /** * Worker node tasks queue. */ diff --git a/src/utility-types.ts b/src/utility-types.ts index f330d192..1f724274 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -53,6 +53,10 @@ export interface WorkerStatistics { */ export interface MessageValue extends Task { + /** + * Worker Id. + */ + readonly workerId?: number /** * Kill code. */ @@ -66,9 +70,13 @@ export interface MessageValue */ readonly taskPerformance?: TaskPerformance /** - * Whether to compute the given statistics or not. + * Whether the worker computes the given statistics or not. */ readonly statistics?: WorkerStatistics + /** + * Whether the worker has started or not. + */ + readonly started?: boolean } /** diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 25be4820..242c4698 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -36,6 +36,10 @@ export abstract class AbstractWorker< Data = unknown, Response = unknown > extends AsyncResource { + /** + * Worker Id. + */ + protected abstract id: number /** * Task function(s) processed by the worker when the pool's `execution` function is invoked. */ @@ -225,6 +229,7 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, + workerId: this.id, id: message.id }) } catch (e) { @@ -234,6 +239,7 @@ export abstract class AbstractWorker< message: err, data: message.data }, + workerId: this.id, id: message.id }) } finally { @@ -258,6 +264,7 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, + workerId: this.id, id: message.id }) return null @@ -269,6 +276,7 @@ export abstract class AbstractWorker< message: err, data: message.data }, + workerId: this.id, id: message.id }) }) diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 13735b1d..16dddd29 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -41,10 +41,19 @@ export class ClusterWorker< cluster.worker as Worker, opts ) + if (!this.isMain) { + this.sendToMainWorker({ workerId: this.id, started: true }) + } + } + + /** @inheritDoc */ + protected get id (): number { + return this.getMainWorker().id } /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { + console.log('sending message to main worker(cluster)', message) this.getMainWorker().send(message) } diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index b6573a97..7a766a95 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -1,4 +1,9 @@ -import { type MessagePort, isMainThread, parentPort } from 'node:worker_threads' +import { + type MessagePort, + isMainThread, + parentPort, + threadId +} from 'node:worker_threads' import type { MessageValue } from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' @@ -41,10 +46,18 @@ export class ThreadWorker< parentPort as MessagePort, opts ) + if (!this.isMain) { + this.sendToMainWorker({ workerId: this.id, started: true }) + } + } + + protected get id (): number { + return threadId } /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { + console.log('sending message to main worker(thread)', message) this.getMainWorker().postMessage(message) } } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index b5a53487..32f5bbee 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -406,11 +406,14 @@ describe('Abstract pool test suite', () => { maxQueuedTasks: 0, failedTasks: 0 }) + for (const workerNode of pool.workerNodes) { + console.log('thread:workerNode.info', workerNode.info) + } await pool.destroy() pool = new DynamicClusterPool( numberOfWorkers, numberOfWorkers * 2, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/cluster/testWorker.js' ) expect(pool.info).toStrictEqual({ type: PoolTypes.dynamic, @@ -426,13 +429,16 @@ describe('Abstract pool test suite', () => { maxQueuedTasks: 0, failedTasks: 0 }) + for (const workerNode of pool.workerNodes) { + console.log('cluster:workerNode.info', workerNode.info) + } await pool.destroy() }) it('Simulate worker not found', async () => { const pool = new StubPoolWithRemoveAllWorker( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js', + './tests/worker-files/thread/testWorker.js', { errorHandler: e => console.error(e) } @@ -450,7 +456,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: 0, executing: 0, @@ -515,7 +521,7 @@ describe('Abstract pool test suite', () => { promises.add(pool.execute()) } for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: 0, executing: maxMultiplier, @@ -554,7 +560,7 @@ describe('Abstract pool test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: maxMultiplier, executing: 0, @@ -607,7 +613,7 @@ describe('Abstract pool test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -643,14 +649,12 @@ describe('Abstract pool test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( - maxMultiplier - ) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: 0, executing: 0, @@ -686,8 +690,8 @@ describe('Abstract pool test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.runTime.history.length).toBe(0) - expect(workerNode.workerUsage.waitTime.history.length).toBe(0) + expect(workerNode.usage.runTime.history.length).toBe(0) + expect(workerNode.usage.waitTime.history.length).toBe(0) } await pool.destroy() }) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 8f3fad8b..631c59f0 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -96,12 +96,12 @@ describe('Fixed cluster pool test suite', () => { } expect(promises.size).toBe(numberOfWorkers * maxMultiplier) for (const workerNode of queuePool.workerNodes) { - expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( queuePool.opts.tasksQueueOptions.concurrency ) - expect(workerNode.workerUsage.tasks.executed).toBe(0) - expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBe(0) + expect(workerNode.usage.tasks.queued).toBeGreaterThan(0) + expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0) } expect(queuePool.info.executingTasks).toBe(numberOfWorkers) expect(queuePool.info.queuedTasks).toBe( @@ -112,13 +112,11 @@ describe('Fixed cluster pool test suite', () => { ) await Promise.all(promises) for (const workerNode of queuePool.workerNodes) { - expect(workerNode.workerUsage.tasks.executing).toBe(0) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( - maxMultiplier - ) - expect(workerNode.workerUsage.tasks.queued).toBe(0) - expect(workerNode.workerUsage.tasks.maxQueued).toBe(1) + expect(workerNode.usage.tasks.executing).toBe(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier) + expect(workerNode.usage.tasks.queued).toBe(0) + expect(workerNode.usage.tasks.maxQueued).toBe(1) } }) @@ -154,7 +152,7 @@ describe('Fixed cluster pool test suite', () => { }) expect( errorPool.workerNodes.some( - workerNode => workerNode.workerUsage.tasks.failed === 1 + workerNode => workerNode.usage.tasks.failed === 1 ) ).toBe(true) }) @@ -180,7 +178,7 @@ describe('Fixed cluster pool test suite', () => { }) expect( asyncErrorPool.workerNodes.some( - workerNode => workerNode.workerUsage.tasks.failed === 1 + workerNode => workerNode.usage.tasks.failed === 1 ) ).toBe(true) }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index dd5a398f..b2aa4a0b 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -209,7 +209,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: maxMultiplier, executing: 0, @@ -270,7 +270,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: maxMultiplier, executing: 0, @@ -476,7 +476,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -512,8 +512,8 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) } @@ -536,7 +536,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -572,8 +572,8 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) } @@ -675,7 +675,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -711,14 +711,12 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual( - 0 - ) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0) } // We need to clean up the resources after our test await pool.destroy() @@ -739,7 +737,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -775,14 +773,12 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.waitTime.aggregate).toBeGreaterThanOrEqual( - 0 - ) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.waitTime.aggregate).toBeGreaterThanOrEqual(0) } // We need to clean up the resources after our test await pool.destroy() @@ -882,7 +878,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -918,12 +914,12 @@ describe('Selection strategies test suite', () => { utilization: expect.any(Number) } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1) + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) } // We need to clean up the resources after our test await pool.destroy() @@ -944,7 +940,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -980,12 +976,12 @@ describe('Selection strategies test suite', () => { utilization: expect.any(Number) } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1) + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) } // We need to clean up the resources after our test await pool.destroy() @@ -1085,7 +1081,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1121,14 +1117,14 @@ describe('Selection strategies test suite', () => { utilization: expect.any(Number) } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.runTime.average).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1154,7 +1150,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1190,14 +1186,14 @@ describe('Selection strategies test suite', () => { utilization: expect.any(Number) } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.runTime.average).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1228,7 +1224,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1264,14 +1260,14 @@ describe('Selection strategies test suite', () => { utilization: expect.any(Number) } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.runTime.median).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.runTime.median).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.elu.utilization).toBeLessThanOrEqual(1) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1450,7 +1446,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1486,12 +1482,12 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.runTime.average).toBeGreaterThanOrEqual(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1522,7 +1518,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1558,12 +1554,12 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) - expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.usage.runTime.average).toBeGreaterThan(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1599,7 +1595,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1635,12 +1631,12 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) - expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) - expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) + expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0) + expect(workerNode.usage.runTime.median).toBeGreaterThan(0) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1833,7 +1829,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: maxMultiplier, executing: 0, @@ -1916,7 +1912,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.workerUsage).toStrictEqual({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: maxMultiplier, executing: 0, diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 017cd7e3..8a0cfead 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -96,12 +96,12 @@ describe('Fixed thread pool test suite', () => { } expect(promises.size).toBe(numberOfThreads * maxMultiplier) for (const workerNode of queuePool.workerNodes) { - expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual( + expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( queuePool.opts.tasksQueueOptions.concurrency ) - expect(workerNode.workerUsage.tasks.executed).toBe(0) - expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBe(0) + expect(workerNode.usage.tasks.queued).toBeGreaterThan(0) + expect(workerNode.usage.tasks.maxQueued).toBeGreaterThan(0) } expect(queuePool.info.executingTasks).toBe(numberOfThreads) expect(queuePool.info.queuedTasks).toBe( @@ -112,13 +112,11 @@ describe('Fixed thread pool test suite', () => { ) await Promise.all(promises) for (const workerNode of queuePool.workerNodes) { - expect(workerNode.workerUsage.tasks.executing).toBe(0) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( - maxMultiplier - ) - expect(workerNode.workerUsage.tasks.queued).toBe(0) - expect(workerNode.workerUsage.tasks.maxQueued).toBe(1) + expect(workerNode.usage.tasks.executing).toBe(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier) + expect(workerNode.usage.tasks.queued).toBe(0) + expect(workerNode.usage.tasks.maxQueued).toBe(1) } }) @@ -156,7 +154,7 @@ describe('Fixed thread pool test suite', () => { }) expect( errorPool.workerNodes.some( - workerNode => workerNode.workerUsage.tasks.failed === 1 + workerNode => workerNode.usage.tasks.failed === 1 ) ).toBe(true) }) @@ -184,7 +182,7 @@ describe('Fixed thread pool test suite', () => { }) expect( asyncErrorPool.workerNodes.some( - workerNode => workerNode.workerUsage.tasks.failed === 1 + workerNode => workerNode.usage.tasks.failed === 1 ) ).toBe(true) }) @@ -210,7 +208,7 @@ describe('Fixed thread pool test suite', () => { }) it('Verify that thread pool options are checked', async () => { - const workerFilePath = './tests/worker-files/cluster/testWorker.js' + const workerFilePath = './tests/worker-files/thread/testWorker.js' let pool1 = new FixedThreadPool(numberOfThreads, workerFilePath) expect(pool1.opts.workerOptions).toBeUndefined() await pool1.destroy() -- 2.34.1