FixedClusterPool,
type ClusterPoolOptions
} from './pools/cluster/fixed'
-export { PoolEvents, PoolTypes, WorkerTypes } from './pools/pool'
+export { PoolEvents, PoolTypes } from './pools/pool'
export type {
IPool,
PoolEmitter,
PoolInfo,
PoolOptions,
PoolType,
- TasksQueueOptions,
- WorkerType
+ TasksQueueOptions
} from './pools/pool'
+export { WorkerTypes } from './pools/worker'
export type {
ErrorHandler,
EventLoopUtilizationMeasurementStatistics,
ExitHandler,
IWorker,
+ IWorkerNode,
MeasurementStatistics,
MessageHandler,
OnlineHandler,
Task,
TaskStatistics,
WorkerInfo,
- WorkerNode,
+ WorkerType,
WorkerUsage
} from './pools/worker'
export {
round
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
-import { CircularArray } from '../circular-array'
-import { Queue } from '../queue'
import {
type IPool,
PoolEmitter,
type PoolOptions,
type PoolType,
PoolTypes,
- type TasksQueueOptions,
- type WorkerType,
- WorkerTypes
+ type TasksQueueOptions
} from './pool'
import type {
IWorker,
+ IWorkerNode,
MessageHandler,
Task,
WorkerInfo,
- WorkerNode,
+ WorkerType,
WorkerUsage
} from './worker'
import {
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
import { version } from './version'
+import { WorkerNode } from './worker-node'
/**
* Base class that implements some shared logic for all poolifier pools.
Response = unknown
> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
+ public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
public readonly emitter?: PoolEmitter
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
for (const workerNode of this.workerNodes) {
- this.setWorkerNodeTasksUsage(
- workerNode,
- this.getInitialWorkerUsage(workerNode.worker)
- )
+ workerNode.resetUsage()
this.setWorkerStatistics(workerNode.worker)
}
}
}
}
- /**
- * Sets the given worker node its tasks usage in the pool.
- *
- * @param workerNode - The worker node.
- * @param workerUsage - The worker usage.
- */
- private setWorkerNodeTasksUsage (
- workerNode: WorkerNode<Worker, Data>,
- workerUsage: WorkerUsage
- ): void {
- workerNode.usage = workerUsage
- }
-
/**
* Gets the worker information.
*
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
- this.workerNodes.push({
- worker,
- info: this.getInitialWorkerInfo(worker),
- usage: this.getInitialWorkerUsage(),
- tasksQueue: new Queue<Task<Data>>()
- })
- this.setWorkerNodeTasksUsage(
- this.workerNodes[this.getWorkerNodeKey(worker)],
- this.getInitialWorkerUsage(worker)
- )
- return this.workerNodes.length
+ return this.workerNodes.push(new WorkerNode(worker, this.worker))
}
- /**
- * Gets the worker id.
- *
- * @param worker - The worker.
- * @returns The worker id.
- */
- private getWorkerId (worker: Worker): number | undefined {
- if (this.worker === WorkerTypes.thread) {
- return worker.threadId
- } else if (this.worker === WorkerTypes.cluster) {
- return worker.id
- }
- }
-
- // /**
- // * Sets the given worker in the pool worker nodes.
- // *
- // * @param workerNodeKey - The worker node key.
- // * @param worker - The worker.
- // * @param workerInfo - The worker info.
- // * @param workerUsage - The worker usage.
- // * @param tasksQueue - The worker task queue.
- // */
- // private setWorkerNode (
- // workerNodeKey: number,
- // worker: Worker,
- // workerInfo: WorkerInfo,
- // workerUsage: WorkerUsage,
- // tasksQueue: Queue<Task<Data>>
- // ): void {
- // this.workerNodes[workerNodeKey] = {
- // worker,
- // info: workerInfo,
- // usage: workerUsage,
- // tasksQueue
- // }
- // }
-
/**
* Removes the given worker from the pool worker nodes.
*
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+ return this.workerNodes[workerNodeKey].enqueueTask(task)
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+ return this.workerNodes[workerNodeKey].dequeueTask()
}
private tasksQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.size
- }
-
- private tasksMaxQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.maxSize
+ return this.workerNodes[workerNodeKey].tasksQueueSize()
}
private flushTasksQueue (workerNodeKey: number): void {
this.dequeueTask(workerNodeKey) as Task<Data>
)
}
- this.workerNodes[workerNodeKey].tasksQueue.clear()
+ this.workerNodes[workerNodeKey].clearTasksQueue()
}
private flushTasksQueues (): void {
}
})
}
-
- private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
- const getTasksQueueSize = (worker?: Worker): number => {
- if (worker == null) {
- return 0
- }
- return this.tasksQueueSize(this.getWorkerNodeKey(worker))
- }
- const getTasksMaxQueueSize = (worker?: Worker): number => {
- if (worker == null) {
- return 0
- }
- return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
- }
- return {
- tasks: {
- executed: 0,
- executing: 0,
- get queued (): number {
- return getTasksQueueSize(worker)
- },
- get maxQueued (): number {
- return getTasksMaxQueueSize(worker)
- },
- failed: 0
- },
- runTime: {
- history: new CircularArray()
- },
- waitTime: {
- history: new CircularArray()
- },
- elu: {
- idle: {
- history: new CircularArray()
- },
- active: {
- history: new CircularArray()
- }
- }
- }
- }
-
- private getInitialWorkerInfo (worker: Worker): WorkerInfo {
- return { id: this.getWorkerId(worker), dynamic: false, started: true }
- }
}
import cluster, { type ClusterSettings, type Worker } from 'node:cluster'
import type { MessageValue } from '../../utility-types'
import { AbstractPool } from '../abstract-pool'
-import {
- type PoolOptions,
- type PoolType,
- PoolTypes,
- type WorkerType,
- WorkerTypes
-} from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
+import { type WorkerType, WorkerTypes } from '../worker'
/**
* Options for a poolifier cluster pool.
ErrorHandler,
ExitHandler,
IWorker,
+ IWorkerNode,
MessageHandler,
OnlineHandler,
- WorkerNode
+ WorkerType
} from './worker'
import type {
WorkerChoiceStrategy,
*/
export type PoolType = keyof typeof PoolTypes
-/**
- * Enumeration of worker types.
- */
-export const WorkerTypes = Object.freeze({
- cluster: 'cluster',
- thread: 'thread'
-} as const)
-
-/**
- * Worker type.
- */
-export type WorkerType = keyof typeof WorkerTypes
-
/**
* Pool events emitter.
*/
* Pool information.
*/
export interface PoolInfo {
- version: string
- type: PoolType
- worker: WorkerType
- minSize: number
- maxSize: number
+ readonly version: string
+ readonly type: PoolType
+ readonly worker: WorkerType
+ readonly minSize: number
+ readonly maxSize: number
/** Pool utilization ratio. */
- utilization?: number
+ readonly utilization?: number
/** Pool total worker nodes */
- workerNodes: number
+ readonly workerNodes: number
/** Pool idle worker nodes */
- idleWorkerNodes: number
+ readonly idleWorkerNodes: number
/** Pool busy worker nodes */
- busyWorkerNodes: number
- executedTasks: number
- executingTasks: number
- queuedTasks: number
- maxQueuedTasks: number
- failedTasks: number
- runTime?: {
- minimum: number
- maximum: number
- average: number
- median?: number
+ readonly busyWorkerNodes: number
+ readonly executedTasks: number
+ readonly executingTasks: number
+ readonly queuedTasks: number
+ readonly maxQueuedTasks: number
+ readonly failedTasks: number
+ readonly runTime?: {
+ readonly minimum: number
+ readonly maximum: number
+ readonly average: number
+ readonly median?: number
}
- waitTime?: {
- minimum: number
- maximum: number
- average: number
- median?: number
+ readonly waitTime?: {
+ readonly minimum: number
+ readonly maximum: number
+ readonly average: number
+ readonly median?: number
}
}
/**
* Pool worker nodes.
*/
- readonly workerNodes: Array<WorkerNode<Worker, Data>>
+ readonly workerNodes: Array<IWorkerNode<Worker, Data>>
/**
* Emitter on which events can be listened to.
*
* @param name - The name of the worker function to execute. If not specified, the default worker function will be executed.
* @returns Promise that will be fulfilled when the task is completed.
*/
- execute: (data?: Data, name?: string) => Promise<Response>
+ readonly execute: (data?: Data, name?: string) => Promise<Response>
/**
* Terminates every current worker in this pool.
*/
- destroy: () => Promise<void>
+ readonly destroy: () => Promise<void>
/**
* Sets the worker choice strategy in this pool.
*
* @param workerChoiceStrategy - The worker choice strategy.
* @param workerChoiceStrategyOptions - The worker choice strategy options.
*/
- setWorkerChoiceStrategy: (
+ readonly setWorkerChoiceStrategy: (
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
) => void
*
* @param workerChoiceStrategyOptions - The worker choice strategy options.
*/
- setWorkerChoiceStrategyOptions: (
+ readonly setWorkerChoiceStrategyOptions: (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
) => void
/**
* @param enable - Whether to enable or disable the worker tasks queue.
* @param tasksQueueOptions - The worker tasks queue options.
*/
- enableTasksQueue: (
+ readonly enableTasksQueue: (
enable: boolean,
tasksQueueOptions?: TasksQueueOptions
) => void
*
* @param tasksQueueOptions - The worker tasks queue options.
*/
- setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
+ readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
}
*
* @returns `true` if the reset is successful, `false` otherwise.
*/
- reset: () => boolean
+ readonly reset: () => boolean
/**
* Updates the worker node key strategy internals.
*
* @returns `true` if the update is successful, `false` otherwise.
*/
- update: (workerNodeKey: number) => boolean
+ readonly update: (workerNodeKey: number) => boolean
/**
* Chooses a worker node in the pool and returns its key.
*
* @returns The worker node key.
*/
- choose: () => number
+ readonly choose: () => number
/**
* Removes the worker node key from strategy internals.
*
* @param workerNodeKey - The worker node key.
* @returns `true` if the worker node key is removed, `false` otherwise.
*/
- remove: (workerNodeKey: number) => boolean
+ readonly remove: (workerNodeKey: number) => boolean
/**
* Sets the worker choice strategy options.
*
* @param opts - The worker choice strategy options.
*/
- setOptions: (opts: WorkerChoiceStrategyOptions) => void
+ readonly setOptions: (opts: WorkerChoiceStrategyOptions) => void
}
} from 'node:worker_threads'
import type { MessageValue } from '../../utility-types'
import { AbstractPool } from '../abstract-pool'
-import {
- type PoolOptions,
- type PoolType,
- PoolTypes,
- type WorkerType,
- WorkerTypes
-} from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
+import { type WorkerType, WorkerTypes } from '../worker'
/**
* Options for a poolifier thread pool.
--- /dev/null
+import { CircularArray } from '../circular-array'
+import { Queue } from '../queue'
+import {
+ type IWorker,
+ type IWorkerNode,
+ type Task,
+ type WorkerInfo,
+ type WorkerType,
+ WorkerTypes,
+ type WorkerUsage
+} from './worker'
+
+export class WorkerNode<Worker extends IWorker, Data = unknown>
+implements IWorkerNode<Worker, Data> {
+ public readonly worker: Worker
+ public readonly info: WorkerInfo
+ public usage: WorkerUsage
+ private readonly tasksQueue: Queue<Task<Data>>
+
+ constructor (worker: Worker, workerType: WorkerType) {
+ this.worker = worker
+ this.info = this.initWorkerInfo(worker, workerType)
+ this.usage = this.initWorkerUsage()
+ this.tasksQueue = new Queue<Task<Data>>()
+ }
+
+ /** @inheritdoc */
+ public tasksQueueSize (): number {
+ return this.tasksQueue.size
+ }
+
+ /**
+ * Worker node tasks queue maximum size.
+ *
+ * @returns The tasks queue maximum size.
+ */
+ private tasksQueueMaxSize (): number {
+ return this.tasksQueue.maxSize
+ }
+
+ /** @inheritdoc */
+ public enqueueTask (task: Task<Data>): number {
+ return this.tasksQueue.enqueue(task)
+ }
+
+ /** @inheritdoc */
+ public dequeueTask (): Task<Data> | undefined {
+ return this.tasksQueue.dequeue()
+ }
+
+ /** @inheritdoc */
+ public clearTasksQueue (): void {
+ this.tasksQueue.clear()
+ }
+
+ public resetUsage (): void {
+ this.usage = this.initWorkerUsage()
+ }
+
+ private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
+ return {
+ id: this.getWorkerId(worker, workerType),
+ type: workerType,
+ dynamic: false,
+ started: true
+ }
+ }
+
+ private initWorkerUsage (): WorkerUsage {
+ const getTasksQueueSize = (): number => {
+ return this.tasksQueueSize()
+ }
+ const getTasksMaxQueueSize = (): number => {
+ return this.tasksQueueMaxSize()
+ }
+ return {
+ tasks: {
+ executed: 0,
+ executing: 0,
+ get queued (): number {
+ return getTasksQueueSize()
+ },
+ get maxQueued (): number {
+ return getTasksMaxQueueSize()
+ },
+ failed: 0
+ },
+ runTime: {
+ history: new CircularArray()
+ },
+ waitTime: {
+ history: new CircularArray()
+ },
+ elu: {
+ idle: {
+ history: new CircularArray()
+ },
+ active: {
+ history: new CircularArray()
+ }
+ }
+ }
+ }
+
+ /**
+ * Gets the worker id.
+ *
+ * @param worker - The worker.
+ * @returns The worker id.
+ */
+ private getWorkerId (
+ worker: Worker,
+ workerType: WorkerType
+ ): number | undefined {
+ if (workerType === WorkerTypes.thread) {
+ return worker.threadId
+ } else if (workerType === WorkerTypes.cluster) {
+ return worker.id
+ }
+ }
+}
import type { CircularArray } from '../circular-array'
-import type { Queue } from '../queue'
/**
* Callback invoked if the worker has received a message.
failed: number
}
+/**
+ * Enumeration of worker types.
+ */
+export const WorkerTypes = Object.freeze({
+ cluster: 'cluster',
+ thread: 'thread'
+} as const)
+
+/**
+ * Worker type.
+ */
+export type WorkerType = keyof typeof WorkerTypes
+
/**
* Worker information.
*
* Worker id.
*/
readonly id: number | undefined
+ /**
+ * Worker type.
+ */
+ type: WorkerType
/**
* Dynamic flag.
*/
* @param event - The event.
* @param handler - The event handler.
*/
- on: ((event: 'message', handler: MessageHandler<this>) => void) &
+ readonly on: ((event: 'message', handler: MessageHandler<this>) => void) &
((event: 'error', handler: ErrorHandler<this>) => void) &
((event: 'online', handler: OnlineHandler<this>) => void) &
((event: 'exit', handler: ExitHandler<this>) => void)
* @param event - `'exit'`.
* @param handler - The exit handler.
*/
- once: (event: 'exit', handler: ExitHandler<this>) => void
+ readonly once: (event: 'exit', handler: ExitHandler<this>) => void
}
/**
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @internal
*/
-export interface WorkerNode<Worker extends IWorker, Data = unknown> {
+export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
/**
* Worker node worker.
*/
*/
usage: WorkerUsage
/**
- * Worker node tasks queue.
+ * Worker node tasks queue size.
+ *
+ * @returns The tasks queue size.
+ */
+ readonly tasksQueueSize: () => number
+ /**
+ * Worker node enqueue task.
+ *
+ * @param task - The task to queue.
+ * @returns The task queue size.
+ */
+ readonly enqueueTask: (task: Task<Data>) => number
+ /**
+ * Worker node dequeue task.
+ *
+ * @returns The dequeued task.
+ */
+ readonly dequeueTask: () => Task<Data> | undefined
+ /**
+ * Worker node clear tasks queue.
+ */
+ readonly clearTasksQueue: () => void
+ /**
+ * Worker node reset usage statistics .
*/
- readonly tasksQueue: Queue<Task<Data>>
+ readonly resetUsage: () => void
}