From 6b27d40762317ec8502657663bdc839e358cda03 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 2 Jun 2023 17:28:54 +0200 Subject: [PATCH] feat: expose pool information MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + src/index.ts | 3 +- src/pools/abstract-pool.ts | 76 +++++++++++++--------- src/pools/cluster/dynamic.ts | 9 ++- src/pools/cluster/fixed.ts | 12 ++-- src/pools/pool.ts | 37 ++++++++--- src/pools/thread/dynamic.ts | 10 ++- src/pools/thread/fixed.ts | 12 ++-- src/queue.ts | 13 ++++ tests/pools/abstract/abstract-pool.test.js | 39 ++++++++++- tests/pools/cluster/fixed.test.js | 7 +- tests/pools/thread/fixed.test.js | 7 +- 12 files changed, 160 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2d09e6e..862fcc4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add `taskError` pool event for task execution error. +- Add pool information `info` property to pool. - Emit pool information on `busy` and `full` pool events. ## [2.5.1] - 2023-06-01 diff --git a/src/index.ts b/src/index.ts index 03ee2809..fd3cefd2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,11 +2,12 @@ export { DynamicClusterPool } from './pools/cluster/dynamic' export { FixedClusterPool } from './pools/cluster/fixed' export type { ClusterPoolOptions } from './pools/cluster/fixed' export type { AbstractPool } from './pools/abstract-pool' -export { PoolEvents } from './pools/pool' +export { PoolEvents, PoolTypes } from './pools/pool' export type { IPool, PoolEmitter, PoolEvent, + PoolInfo, PoolOptions, PoolType, TasksQueueOptions diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3127557e..58fc28ce 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -13,8 +13,10 @@ import { type IPool, PoolEmitter, PoolEvents, + type PoolInfo, type PoolOptions, - PoolType, + type PoolType, + PoolTypes, type TasksQueueOptions } from './pool' import type { IWorker, Task, TasksUsage, WorkerNode } from './worker' @@ -133,7 +135,7 @@ export abstract class AbstractPool< throw new RangeError( 'Cannot instantiate a pool with a negative number of workers' ) - } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) { + } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) { throw new Error('Cannot instantiate a fixed pool with no worker') } } @@ -185,7 +187,7 @@ export abstract class AbstractPool< } if ( workerChoiceStrategyOptions.weights != null && - Object.keys(workerChoiceStrategyOptions.weights).length !== this.size + Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize ) { throw new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -212,30 +214,48 @@ export abstract class AbstractPool< public abstract get type (): PoolType /** @inheritDoc */ - public abstract get size (): number + public get info (): PoolInfo { + return { + type: this.type, + minSize: this.minSize, + maxSize: this.maxSize, + workerNodes: this.workerNodes.length, + idleWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator, + 0 + ), + busyWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator, + 0 + ), + runningTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.tasksUsage.running, + 0 + ), + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size, + 0 + ), + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.tasksQueue.maxSize, + 0 + ) + } + } /** - * Number of tasks running in the pool. + * Pool minimum size. */ - private get numberOfRunningTasks (): number { - return this.workerNodes.reduce( - (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running, - 0 - ) - } + protected abstract get minSize (): number /** - * Number of tasks queued in the pool. + * Pool maximum size. */ - private get numberOfQueuedTasks (): number { - if (this.opts.enableTasksQueue === false) { - return 0 - } - return this.workerNodes.reduce( - (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size, - 0 - ) - } + protected abstract get maxSize (): number /** * Gets the given worker its worker node key. @@ -497,7 +517,7 @@ export abstract class AbstractPool< */ protected chooseWorkerNode (): number { let workerNodeKey: number - if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) { + if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) { const workerCreated = this.createAndSetupWorker() this.registerWorkerMessageListener(workerCreated, message => { const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated) @@ -627,17 +647,11 @@ export abstract class AbstractPool< private checkAndEmitEvents (): void { if (this.emitter != null) { - const poolInfo = { - size: this.size, - workerNodes: this.workerNodes.length, - runningTasks: this.numberOfRunningTasks, - queuedTasks: this.numberOfQueuedTasks - } if (this.busy) { - this.emitter?.emit(PoolEvents.busy, poolInfo) + this.emitter?.emit(PoolEvents.busy, this.info) } - if (this.type === PoolType.DYNAMIC && this.full) { - this.emitter?.emit(PoolEvents.full, poolInfo) + if (this.type === PoolTypes.dynamic && this.full) { + this.emitter?.emit(PoolEvents.full, this.info) } } } diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 67020577..d4f270a3 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -1,6 +1,5 @@ -import { PoolType } from '../pool' -import type { ClusterPoolOptions } from './fixed' -import { FixedClusterPool } from './fixed' +import { type PoolType, PoolTypes } from '../pool' +import { type ClusterPoolOptions, FixedClusterPool } from './fixed' /** * A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers. @@ -36,11 +35,11 @@ export class DynamicClusterPool< /** @inheritDoc */ public get type (): PoolType { - return PoolType.DYNAMIC + return PoolTypes.dynamic } /** @inheritDoc */ - public get size (): number { + protected get maxSize (): number { return this.max } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 89cab8ef..a0c6b16f 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -2,8 +2,7 @@ import type { ClusterSettings, Worker } from 'node:cluster' import cluster from 'node:cluster' import type { MessageValue } from '../../utility-types' import { AbstractPool } from '../abstract-pool' -import type { PoolOptions } from '../pool' -import { PoolType } from '../pool' +import { type PoolOptions, type PoolType, PoolTypes } from '../pool' /** * Options for a poolifier cluster pool. @@ -97,11 +96,16 @@ export class FixedClusterPool< /** @inheritDoc */ public get type (): PoolType { - return PoolType.FIXED + return PoolTypes.fixed } /** @inheritDoc */ - public get size (): number { + protected get minSize (): number { + return this.numberOfWorkers + } + + /** @inheritDoc */ + protected get maxSize (): number { return this.numberOfWorkers } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 32adfc66..0013bf5d 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -13,21 +13,23 @@ import type { } from './selection-strategies/selection-strategies-types' /** - * Pool types. - * - * @enum - * @internal + * Enumeration of pool types. */ -export enum PoolType { +export const PoolTypes = Object.freeze({ /** * Fixed pool type. */ - FIXED = 'fixed', + fixed: 'fixed', /** * Dynamic pool type. */ - DYNAMIC = 'dynamic' -} + dynamic: 'dynamic' +} as const) + +/** + * Pool type. + */ +export type PoolType = keyof typeof PoolTypes /** * Pool events emitter. @@ -49,6 +51,21 @@ export const PoolEvents = Object.freeze({ */ export type PoolEvent = keyof typeof PoolEvents +/** + * Pool information. + */ +export interface PoolInfo { + type: PoolType + minSize: number + maxSize: number + workerNodes: number + idleWorkerNodes: number + busyWorkerNodes: number + runningTasks: number + queuedTasks: number + maxQueuedTasks: number +} + /** * Worker tasks queue options. */ @@ -134,9 +151,9 @@ export interface IPool< */ readonly type: PoolType /** - * Pool maximum size. + * Pool information. */ - readonly size: number + readonly info: PoolInfo /** * Pool worker nodes. */ diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 0519873e..00d25c0a 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,7 +1,5 @@ -import type { PoolOptions } from '../pool' -import { PoolType } from '../pool' -import type { ThreadWorkerWithMessageChannel } from './fixed' -import { FixedThreadPool } from './fixed' +import { type PoolOptions, type PoolType, PoolTypes } from '../pool' +import { FixedThreadPool, type ThreadWorkerWithMessageChannel } from './fixed' /** * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads. @@ -37,7 +35,7 @@ export class DynamicThreadPool< /** @inheritDoc */ public get type (): PoolType { - return PoolType.DYNAMIC + return PoolTypes.dynamic } /** @inheritDoc */ @@ -46,7 +44,7 @@ export class DynamicThreadPool< } /** @inheritDoc */ - public get size (): number { + protected get maxSize (): number { return this.max } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 816fa61a..9939a04b 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -6,8 +6,7 @@ import { } from 'node:worker_threads' import type { Draft, MessageValue } from '../../utility-types' import { AbstractPool } from '../abstract-pool' -import type { PoolOptions } from '../pool' -import { PoolType } from '../pool' +import { type PoolOptions, type PoolType, PoolTypes } from '../pool' /** * A thread worker with message channels for communication between main thread and thread worker. @@ -93,11 +92,16 @@ export class FixedThreadPool< /** @inheritDoc */ public get type (): PoolType { - return PoolType.FIXED + return PoolTypes.fixed } /** @inheritDoc */ - public get size (): number { + protected get minSize (): number { + return this.numberOfWorkers + } + + /** @inheritDoc */ + protected get maxSize (): number { return this.numberOfWorkers } diff --git a/src/queue.ts b/src/queue.ts index 7682f65f..86b69051 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -9,11 +9,13 @@ export class Queue { private items: Record private head: number private tail: number + private max: number public constructor () { this.items = {} this.head = 0 this.tail = 0 + this.max = 0 } /** @@ -26,6 +28,16 @@ export class Queue { return this.tail - this.head } + /** + * Get the maximum size of the queue. + * + * @returns The maximum size of the queue. + * @readonly + */ + public get maxSize (): number { + return this.max + } + /** * Enqueue an item. * @@ -35,6 +47,7 @@ export class Queue { public enqueue (item: T): number { this.items[this.tail] = item this.tail++ + if (this.size > this.max) this.max = this.size return this.size } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 91c942f6..b41ab1ad 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -5,7 +5,8 @@ const { FixedClusterPool, FixedThreadPool, PoolEvents, - WorkerChoiceStrategies + WorkerChoiceStrategies, + PoolTypes } = require('../../../lib') const { CircularArray } = require('../../../lib/circular-array') const { Queue } = require('../../../lib/queue') @@ -274,6 +275,42 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that pool info is set', async () => { + let pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js' + ) + expect(pool.info).toStrictEqual({ + type: PoolTypes.fixed, + minSize: numberOfWorkers, + maxSize: numberOfWorkers, + workerNodes: numberOfWorkers, + idleWorkerNodes: numberOfWorkers, + busyWorkerNodes: 0, + runningTasks: 0, + queuedTasks: 0, + maxQueuedTasks: 0 + }) + await pool.destroy() + pool = new DynamicClusterPool( + numberOfWorkers, + numberOfWorkers * 2, + './tests/worker-files/thread/testWorker.js' + ) + expect(pool.info).toStrictEqual({ + type: PoolTypes.dynamic, + minSize: numberOfWorkers, + maxSize: numberOfWorkers * 2, + workerNodes: numberOfWorkers, + idleWorkerNodes: numberOfWorkers, + busyWorkerNodes: 0, + runningTasks: 0, + queuedTasks: 0, + maxQueuedTasks: 0 + }) + await pool.destroy() + }) + it('Simulate worker not found', async () => { const pool = new StubPoolWithRemoveAllWorker( numberOfWorkers, diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index ac5d1fda..9fc2b1b4 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -102,8 +102,11 @@ describe('Fixed cluster pool test suite', () => { expect(workerNode.tasksUsage.run).toBe(0) expect(workerNode.tasksQueue.size).toBeGreaterThan(0) } - expect(queuePool.numberOfRunningTasks).toBe(numberOfWorkers) - expect(queuePool.numberOfQueuedTasks).toBe( + expect(queuePool.info.runningTasks).toBe(numberOfWorkers) + expect(queuePool.info.queuedTasks).toBe( + numberOfWorkers * maxMultiplier - numberOfWorkers + ) + expect(queuePool.info.maxQueuedTasks).toBe( numberOfWorkers * maxMultiplier - numberOfWorkers ) await Promise.all(promises) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index ef368968..dcb7fac7 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -102,8 +102,11 @@ describe('Fixed thread pool test suite', () => { expect(workerNode.tasksUsage.run).toBe(0) expect(workerNode.tasksQueue.size).toBeGreaterThan(0) } - expect(queuePool.numberOfRunningTasks).toBe(numberOfThreads) - expect(queuePool.numberOfQueuedTasks).toBe( + expect(queuePool.info.runningTasks).toBe(numberOfThreads) + expect(queuePool.info.queuedTasks).toBe( + numberOfThreads * maxMultiplier - numberOfThreads + ) + expect(queuePool.info.maxQueuedTasks).toBe( numberOfThreads * maxMultiplier - numberOfThreads ) await Promise.all(promises) -- 2.34.1