From 4b628b4844b461e434c5945feead43b0cc7aab01 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 7 Jul 2023 18:08:11 +0200 Subject: [PATCH] refactor: encapsulate worker node handling logic into its own class MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/index.ts | 9 +- src/pools/abstract-pool.ts | 138 ++---------------- src/pools/cluster/fixed.ts | 9 +- src/pools/pool.ts | 78 +++++----- .../selection-strategies-types.ts | 10 +- src/pools/thread/fixed.ts | 9 +- src/pools/worker-node.ts | 121 +++++++++++++++ src/pools/worker.ts | 51 ++++++- 8 files changed, 224 insertions(+), 201 deletions(-) create mode 100644 src/pools/worker-node.ts diff --git a/src/index.ts b/src/index.ts index eeda859e..54c352fc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,7 +4,7 @@ export { FixedClusterPool, type ClusterPoolOptions } from './pools/cluster/fixed' -export { PoolEvents, PoolTypes, WorkerTypes } from './pools/pool' +export { PoolEvents, PoolTypes } from './pools/pool' export type { IPool, PoolEmitter, @@ -12,21 +12,22 @@ export type { PoolInfo, PoolOptions, PoolType, - TasksQueueOptions, - WorkerType + TasksQueueOptions } from './pools/pool' +export { WorkerTypes } from './pools/worker' export type { ErrorHandler, EventLoopUtilizationMeasurementStatistics, ExitHandler, IWorker, + IWorkerNode, MeasurementStatistics, MessageHandler, OnlineHandler, Task, TaskStatistics, WorkerInfo, - WorkerNode, + WorkerType, WorkerUsage } from './pools/worker' export { diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4b1a05ff..097882b7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,8 +10,6 @@ import { round } from '../utils' import { KillBehaviors } from '../worker/worker-options' -import { CircularArray } from '../circular-array' -import { Queue } from '../queue' import { type IPool, PoolEmitter, @@ -20,16 +18,15 @@ import { type PoolOptions, type PoolType, PoolTypes, - type TasksQueueOptions, - type WorkerType, - WorkerTypes + type TasksQueueOptions } from './pool' import type { IWorker, + IWorkerNode, MessageHandler, Task, WorkerInfo, - WorkerNode, + WorkerType, WorkerUsage } from './worker' import { @@ -40,6 +37,7 @@ import { } from './selection-strategies/selection-strategies-types' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' import { version } from './version' +import { WorkerNode } from './worker-node' /** * Base class that implements some shared logic for all poolifier pools. @@ -54,7 +52,7 @@ export abstract class AbstractPool< Response = unknown > implements IPool { /** @inheritDoc */ - public readonly workerNodes: Array> = [] + public readonly workerNodes: Array> = [] /** @inheritDoc */ public readonly emitter?: PoolEmitter @@ -463,10 +461,7 @@ export abstract class AbstractPool< this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } for (const workerNode of this.workerNodes) { - this.setWorkerNodeTasksUsage( - workerNode, - this.getInitialWorkerUsage(workerNode.worker) - ) + workerNode.resetUsage() this.setWorkerStatistics(workerNode.worker) } } @@ -1036,19 +1031,6 @@ export abstract class AbstractPool< } } - /** - * Sets the given worker node its tasks usage in the pool. - * - * @param workerNode - The worker node. - * @param workerUsage - The worker usage. - */ - private setWorkerNodeTasksUsage ( - workerNode: WorkerNode, - workerUsage: WorkerUsage - ): void { - workerNode.usage = workerUsage - } - /** * Gets the worker information. * @@ -1065,57 +1047,9 @@ export abstract class AbstractPool< * @returns The worker nodes length. */ private pushWorkerNode (worker: Worker): number { - this.workerNodes.push({ - worker, - info: this.getInitialWorkerInfo(worker), - usage: this.getInitialWorkerUsage(), - tasksQueue: new Queue>() - }) - this.setWorkerNodeTasksUsage( - this.workerNodes[this.getWorkerNodeKey(worker)], - this.getInitialWorkerUsage(worker) - ) - return this.workerNodes.length + return this.workerNodes.push(new WorkerNode(worker, this.worker)) } - /** - * Gets the worker id. - * - * @param worker - The worker. - * @returns The worker id. - */ - private getWorkerId (worker: Worker): number | undefined { - if (this.worker === WorkerTypes.thread) { - return worker.threadId - } else if (this.worker === WorkerTypes.cluster) { - return worker.id - } - } - - // /** - // * Sets the given worker in the pool worker nodes. - // * - // * @param workerNodeKey - The worker node key. - // * @param worker - The worker. - // * @param workerInfo - The worker info. - // * @param workerUsage - The worker usage. - // * @param tasksQueue - The worker task queue. - // */ - // private setWorkerNode ( - // workerNodeKey: number, - // worker: Worker, - // workerInfo: WorkerInfo, - // workerUsage: WorkerUsage, - // tasksQueue: Queue> - // ): void { - // this.workerNodes[workerNodeKey] = { - // worker, - // info: workerInfo, - // usage: workerUsage, - // tasksQueue - // } - // } - /** * Removes the given worker from the pool worker nodes. * @@ -1135,19 +1069,15 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task) + return this.workerNodes[workerNodeKey].enqueueTask(task) } private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].tasksQueue.dequeue() + return this.workerNodes[workerNodeKey].dequeueTask() } private tasksQueueSize (workerNodeKey: number): number { - return this.workerNodes[workerNodeKey].tasksQueue.size - } - - private tasksMaxQueueSize (workerNodeKey: number): number { - return this.workerNodes[workerNodeKey].tasksQueue.maxSize + return this.workerNodes[workerNodeKey].tasksQueueSize() } private flushTasksQueue (workerNodeKey: number): void { @@ -1157,7 +1087,7 @@ export abstract class AbstractPool< this.dequeueTask(workerNodeKey) as Task ) } - this.workerNodes[workerNodeKey].tasksQueue.clear() + this.workerNodes[workerNodeKey].clearTasksQueue() } private flushTasksQueues (): void { @@ -1177,50 +1107,4 @@ export abstract class AbstractPool< } }) } - - private getInitialWorkerUsage (worker?: Worker): WorkerUsage { - const getTasksQueueSize = (worker?: Worker): number => { - if (worker == null) { - return 0 - } - return this.tasksQueueSize(this.getWorkerNodeKey(worker)) - } - const getTasksMaxQueueSize = (worker?: Worker): number => { - if (worker == null) { - return 0 - } - return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker)) - } - return { - tasks: { - executed: 0, - executing: 0, - get queued (): number { - return getTasksQueueSize(worker) - }, - get maxQueued (): number { - return getTasksMaxQueueSize(worker) - }, - failed: 0 - }, - runTime: { - history: new CircularArray() - }, - waitTime: { - history: new CircularArray() - }, - elu: { - idle: { - history: new CircularArray() - }, - active: { - history: new CircularArray() - } - } - } - } - - private getInitialWorkerInfo (worker: Worker): WorkerInfo { - return { id: this.getWorkerId(worker), dynamic: false, started: true } - } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index a88197f7..fe920f8a 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -1,13 +1,8 @@ import cluster, { type ClusterSettings, type Worker } from 'node:cluster' import type { MessageValue } from '../../utility-types' import { AbstractPool } from '../abstract-pool' -import { - type PoolOptions, - type PoolType, - PoolTypes, - type WorkerType, - WorkerTypes -} from '../pool' +import { type PoolOptions, type PoolType, PoolTypes } from '../pool' +import { type WorkerType, WorkerTypes } from '../worker' /** * Options for a poolifier cluster pool. diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 5be4f95a..b1892752 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -3,9 +3,10 @@ import type { ErrorHandler, ExitHandler, IWorker, + IWorkerNode, MessageHandler, OnlineHandler, - WorkerNode + WorkerType } from './worker' import type { WorkerChoiceStrategy, @@ -31,19 +32,6 @@ export const PoolTypes = Object.freeze({ */ export type PoolType = keyof typeof PoolTypes -/** - * Enumeration of worker types. - */ -export const WorkerTypes = Object.freeze({ - cluster: 'cluster', - thread: 'thread' -} as const) - -/** - * Worker type. - */ -export type WorkerType = keyof typeof WorkerTypes - /** * Pool events emitter. */ @@ -68,35 +56,35 @@ export type PoolEvent = keyof typeof PoolEvents * Pool information. */ export interface PoolInfo { - version: string - type: PoolType - worker: WorkerType - minSize: number - maxSize: number + readonly version: string + readonly type: PoolType + readonly worker: WorkerType + readonly minSize: number + readonly maxSize: number /** Pool utilization ratio. */ - utilization?: number + readonly utilization?: number /** Pool total worker nodes */ - workerNodes: number + readonly workerNodes: number /** Pool idle worker nodes */ - idleWorkerNodes: number + readonly idleWorkerNodes: number /** Pool busy worker nodes */ - busyWorkerNodes: number - executedTasks: number - executingTasks: number - queuedTasks: number - maxQueuedTasks: number - failedTasks: number - runTime?: { - minimum: number - maximum: number - average: number - median?: number + readonly busyWorkerNodes: number + readonly executedTasks: number + readonly executingTasks: number + readonly queuedTasks: number + readonly maxQueuedTasks: number + readonly failedTasks: number + readonly runTime?: { + readonly minimum: number + readonly maximum: number + readonly average: number + readonly median?: number } - waitTime?: { - minimum: number - maximum: number - average: number - median?: number + readonly waitTime?: { + readonly minimum: number + readonly maximum: number + readonly average: number + readonly median?: number } } @@ -185,7 +173,7 @@ export interface IPool< /** * Pool worker nodes. */ - readonly workerNodes: Array> + readonly workerNodes: Array> /** * Emitter on which events can be listened to. * @@ -204,18 +192,18 @@ export interface IPool< * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed. * @returns Promise that will be fulfilled when the task is completed. */ - execute: (data?: Data, name?: string) => Promise + readonly execute: (data?: Data, name?: string) => Promise /** * Terminates every current worker in this pool. */ - destroy: () => Promise + readonly destroy: () => Promise /** * Sets the worker choice strategy in this pool. * * @param workerChoiceStrategy - The worker choice strategy. * @param workerChoiceStrategyOptions - The worker choice strategy options. */ - setWorkerChoiceStrategy: ( + readonly setWorkerChoiceStrategy: ( workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ) => void @@ -224,7 +212,7 @@ export interface IPool< * * @param workerChoiceStrategyOptions - The worker choice strategy options. */ - setWorkerChoiceStrategyOptions: ( + readonly setWorkerChoiceStrategyOptions: ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ) => void /** @@ -233,7 +221,7 @@ export interface IPool< * @param enable - Whether to enable or disable the worker tasks queue. * @param tasksQueueOptions - The worker tasks queue options. */ - enableTasksQueue: ( + readonly enableTasksQueue: ( enable: boolean, tasksQueueOptions?: TasksQueueOptions ) => void @@ -242,5 +230,5 @@ export interface IPool< * * @param tasksQueueOptions - The worker tasks queue options. */ - setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void + readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void } diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 9f68c22f..628e3095 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -169,30 +169,30 @@ export interface IWorkerChoiceStrategy { * * @returns `true` if the reset is successful, `false` otherwise. */ - reset: () => boolean + readonly reset: () => boolean /** * Updates the worker node key strategy internals. * * @returns `true` if the update is successful, `false` otherwise. */ - update: (workerNodeKey: number) => boolean + readonly update: (workerNodeKey: number) => boolean /** * Chooses a worker node in the pool and returns its key. * * @returns The worker node key. */ - choose: () => number + readonly choose: () => number /** * Removes the worker node key from strategy internals. * * @param workerNodeKey - The worker node key. * @returns `true` if the worker node key is removed, `false` otherwise. */ - remove: (workerNodeKey: number) => boolean + readonly remove: (workerNodeKey: number) => boolean /** * Sets the worker choice strategy options. * * @param opts - The worker choice strategy options. */ - setOptions: (opts: WorkerChoiceStrategyOptions) => void + readonly setOptions: (opts: WorkerChoiceStrategyOptions) => void } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index ac629e1f..9861a3c1 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -6,13 +6,8 @@ import { } from 'node:worker_threads' import type { MessageValue } from '../../utility-types' import { AbstractPool } from '../abstract-pool' -import { - type PoolOptions, - type PoolType, - PoolTypes, - type WorkerType, - WorkerTypes -} from '../pool' +import { type PoolOptions, type PoolType, PoolTypes } from '../pool' +import { type WorkerType, WorkerTypes } from '../worker' /** * Options for a poolifier thread pool. diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts new file mode 100644 index 00000000..a493b65d --- /dev/null +++ b/src/pools/worker-node.ts @@ -0,0 +1,121 @@ +import { CircularArray } from '../circular-array' +import { Queue } from '../queue' +import { + type IWorker, + type IWorkerNode, + type Task, + type WorkerInfo, + type WorkerType, + WorkerTypes, + type WorkerUsage +} from './worker' + +export class WorkerNode +implements IWorkerNode { + public readonly worker: Worker + public readonly info: WorkerInfo + public usage: WorkerUsage + private readonly tasksQueue: Queue> + + constructor (worker: Worker, workerType: WorkerType) { + this.worker = worker + this.info = this.initWorkerInfo(worker, workerType) + this.usage = this.initWorkerUsage() + this.tasksQueue = new Queue>() + } + + /** @inheritdoc */ + public tasksQueueSize (): number { + return this.tasksQueue.size + } + + /** + * Worker node tasks queue maximum size. + * + * @returns The tasks queue maximum size. + */ + private tasksQueueMaxSize (): number { + return this.tasksQueue.maxSize + } + + /** @inheritdoc */ + public enqueueTask (task: Task): number { + return this.tasksQueue.enqueue(task) + } + + /** @inheritdoc */ + public dequeueTask (): Task | undefined { + return this.tasksQueue.dequeue() + } + + /** @inheritdoc */ + public clearTasksQueue (): void { + this.tasksQueue.clear() + } + + public resetUsage (): void { + this.usage = this.initWorkerUsage() + } + + private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo { + return { + id: this.getWorkerId(worker, workerType), + type: workerType, + dynamic: false, + started: true + } + } + + private initWorkerUsage (): WorkerUsage { + const getTasksQueueSize = (): number => { + return this.tasksQueueSize() + } + const getTasksMaxQueueSize = (): number => { + return this.tasksQueueMaxSize() + } + return { + tasks: { + executed: 0, + executing: 0, + get queued (): number { + return getTasksQueueSize() + }, + get maxQueued (): number { + return getTasksMaxQueueSize() + }, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + } + } + + /** + * Gets the worker id. + * + * @param worker - The worker. + * @returns The worker id. + */ + private getWorkerId ( + worker: Worker, + workerType: WorkerType + ): number | undefined { + if (workerType === WorkerTypes.thread) { + return worker.threadId + } else if (workerType === WorkerTypes.cluster) { + return worker.id + } + } +} diff --git a/src/pools/worker.ts b/src/pools/worker.ts index d071413b..81b49d3b 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -1,5 +1,4 @@ import type { CircularArray } from '../circular-array' -import type { Queue } from '../queue' /** * Callback invoked if the worker has received a message. @@ -126,6 +125,19 @@ export interface TaskStatistics { failed: number } +/** + * Enumeration of worker types. + */ +export const WorkerTypes = Object.freeze({ + cluster: 'cluster', + thread: 'thread' +} as const) + +/** + * Worker type. + */ +export type WorkerType = keyof typeof WorkerTypes + /** * Worker information. * @@ -136,6 +148,10 @@ export interface WorkerInfo { * Worker id. */ readonly id: number | undefined + /** + * Worker type. + */ + type: WorkerType /** * Dynamic flag. */ @@ -185,7 +201,7 @@ export interface IWorker { * @param event - The event. * @param handler - The event handler. */ - on: ((event: 'message', handler: MessageHandler) => void) & + readonly on: ((event: 'message', handler: MessageHandler) => void) & ((event: 'error', handler: ErrorHandler) => void) & ((event: 'online', handler: OnlineHandler) => void) & ((event: 'exit', handler: ExitHandler) => void) @@ -195,7 +211,7 @@ export interface IWorker { * @param event - `'exit'`. * @param handler - The exit handler. */ - once: (event: 'exit', handler: ExitHandler) => void + readonly once: (event: 'exit', handler: ExitHandler) => void } /** @@ -205,7 +221,7 @@ export interface IWorker { * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. * @internal */ -export interface WorkerNode { +export interface IWorkerNode { /** * Worker node worker. */ @@ -219,7 +235,30 @@ export interface WorkerNode { */ usage: WorkerUsage /** - * Worker node tasks queue. + * Worker node tasks queue size. + * + * @returns The tasks queue size. + */ + readonly tasksQueueSize: () => number + /** + * Worker node enqueue task. + * + * @param task - The task to queue. + * @returns The task queue size. + */ + readonly enqueueTask: (task: Task) => number + /** + * Worker node dequeue task. + * + * @returns The dequeued task. + */ + readonly dequeueTask: () => Task | undefined + /** + * Worker node clear tasks queue. + */ + readonly clearTasksQueue: () => void + /** + * Worker node reset usage statistics . */ - readonly tasksQueue: Queue> + readonly resetUsage: () => void } -- 2.34.1