### Added
- Add `taskError` pool event for task execution error.
+- Add pool information `info` property to pool.
- Emit pool information on `busy` and `full` pool events.
## [2.5.1] - 2023-06-01
export { FixedClusterPool } from './pools/cluster/fixed'
export type { ClusterPoolOptions } from './pools/cluster/fixed'
export type { AbstractPool } from './pools/abstract-pool'
-export { PoolEvents } from './pools/pool'
+export { PoolEvents, PoolTypes } from './pools/pool'
export type {
IPool,
PoolEmitter,
PoolEvent,
+ PoolInfo,
PoolOptions,
PoolType,
TasksQueueOptions
type IPool,
PoolEmitter,
PoolEvents,
+ type PoolInfo,
type PoolOptions,
- PoolType,
+ type PoolType,
+ PoolTypes,
type TasksQueueOptions
} from './pool'
import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
+ } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
throw new Error('Cannot instantiate a fixed pool with no worker')
}
}
}
if (
workerChoiceStrategyOptions.weights != null &&
- Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
+ Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
) {
throw new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
public abstract get type (): PoolType
/** @inheritDoc */
- public abstract get size (): number
+ public get info (): PoolInfo {
+ return {
+ type: this.type,
+ minSize: this.minSize,
+ maxSize: this.maxSize,
+ workerNodes: this.workerNodes.length,
+ idleWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
+ 0
+ ),
+ busyWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
+ 0
+ ),
+ runningTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.tasksUsage.running,
+ 0
+ ),
+ queuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
+ 0
+ ),
+ maxQueuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.tasksQueue.maxSize,
+ 0
+ )
+ }
+ }
/**
- * Number of tasks running in the pool.
+ * Pool minimum size.
*/
- private get numberOfRunningTasks (): number {
- return this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running,
- 0
- )
- }
+ protected abstract get minSize (): number
/**
- * Number of tasks queued in the pool.
+ * Pool maximum size.
*/
- private get numberOfQueuedTasks (): number {
- if (this.opts.enableTasksQueue === false) {
- return 0
- }
- return this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
- 0
- )
- }
+ protected abstract get maxSize (): number
/**
* Gets the given worker its worker node key.
*/
protected chooseWorkerNode (): number {
let workerNodeKey: number
- if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
+ if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
const workerCreated = this.createAndSetupWorker()
this.registerWorkerMessageListener(workerCreated, message => {
const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
private checkAndEmitEvents (): void {
if (this.emitter != null) {
- const poolInfo = {
- size: this.size,
- workerNodes: this.workerNodes.length,
- runningTasks: this.numberOfRunningTasks,
- queuedTasks: this.numberOfQueuedTasks
- }
if (this.busy) {
- this.emitter?.emit(PoolEvents.busy, poolInfo)
+ this.emitter?.emit(PoolEvents.busy, this.info)
}
- if (this.type === PoolType.DYNAMIC && this.full) {
- this.emitter?.emit(PoolEvents.full, poolInfo)
+ if (this.type === PoolTypes.dynamic && this.full) {
+ this.emitter?.emit(PoolEvents.full, this.info)
}
}
}
-import { PoolType } from '../pool'
-import type { ClusterPoolOptions } from './fixed'
-import { FixedClusterPool } from './fixed'
+import { type PoolType, PoolTypes } from '../pool'
+import { type ClusterPoolOptions, FixedClusterPool } from './fixed'
/**
* A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
/** @inheritDoc */
public get type (): PoolType {
- return PoolType.DYNAMIC
+ return PoolTypes.dynamic
}
/** @inheritDoc */
- public get size (): number {
+ protected get maxSize (): number {
return this.max
}
import cluster from 'node:cluster'
import type { MessageValue } from '../../utility-types'
import { AbstractPool } from '../abstract-pool'
-import type { PoolOptions } from '../pool'
-import { PoolType } from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
/**
* Options for a poolifier cluster pool.
/** @inheritDoc */
public get type (): PoolType {
- return PoolType.FIXED
+ return PoolTypes.fixed
}
/** @inheritDoc */
- public get size (): number {
+ protected get minSize (): number {
+ return this.numberOfWorkers
+ }
+
+ /** @inheritDoc */
+ protected get maxSize (): number {
return this.numberOfWorkers
}
} from './selection-strategies/selection-strategies-types'
/**
- * Pool types.
- *
- * @enum
- * @internal
+ * Enumeration of pool types.
*/
-export enum PoolType {
+export const PoolTypes = Object.freeze({
/**
* Fixed pool type.
*/
- FIXED = 'fixed',
+ fixed: 'fixed',
/**
* Dynamic pool type.
*/
- DYNAMIC = 'dynamic'
-}
+ dynamic: 'dynamic'
+} as const)
+
+/**
+ * Pool type.
+ */
+export type PoolType = keyof typeof PoolTypes
/**
* Pool events emitter.
*/
export type PoolEvent = keyof typeof PoolEvents
+/**
+ * Pool information.
+ */
+export interface PoolInfo {
+ type: PoolType
+ minSize: number
+ maxSize: number
+ workerNodes: number
+ idleWorkerNodes: number
+ busyWorkerNodes: number
+ runningTasks: number
+ queuedTasks: number
+ maxQueuedTasks: number
+}
+
/**
* Worker tasks queue options.
*/
*/
readonly type: PoolType
/**
- * Pool maximum size.
+ * Pool information.
*/
- readonly size: number
+ readonly info: PoolInfo
/**
* Pool worker nodes.
*/
-import type { PoolOptions } from '../pool'
-import { PoolType } from '../pool'
-import type { ThreadWorkerWithMessageChannel } from './fixed'
-import { FixedThreadPool } from './fixed'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
+import { FixedThreadPool, type ThreadWorkerWithMessageChannel } from './fixed'
/**
* A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
/** @inheritDoc */
public get type (): PoolType {
- return PoolType.DYNAMIC
+ return PoolTypes.dynamic
}
/** @inheritDoc */
}
/** @inheritDoc */
- public get size (): number {
+ protected get maxSize (): number {
return this.max
}
} from 'node:worker_threads'
import type { Draft, MessageValue } from '../../utility-types'
import { AbstractPool } from '../abstract-pool'
-import type { PoolOptions } from '../pool'
-import { PoolType } from '../pool'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
/**
* A thread worker with message channels for communication between main thread and thread worker.
/** @inheritDoc */
public get type (): PoolType {
- return PoolType.FIXED
+ return PoolTypes.fixed
}
/** @inheritDoc */
- public get size (): number {
+ protected get minSize (): number {
+ return this.numberOfWorkers
+ }
+
+ /** @inheritDoc */
+ protected get maxSize (): number {
return this.numberOfWorkers
}
private items: Record<number, T>
private head: number
private tail: number
+ private max: number
public constructor () {
this.items = {}
this.head = 0
this.tail = 0
+ this.max = 0
}
/**
return this.tail - this.head
}
+ /**
+ * Get the maximum size of the queue.
+ *
+ * @returns The maximum size of the queue.
+ * @readonly
+ */
+ public get maxSize (): number {
+ return this.max
+ }
+
/**
* Enqueue an item.
*
public enqueue (item: T): number {
this.items[this.tail] = item
this.tail++
+ if (this.size > this.max) this.max = this.size
return this.size
}
FixedClusterPool,
FixedThreadPool,
PoolEvents,
- WorkerChoiceStrategies
+ WorkerChoiceStrategies,
+ PoolTypes
} = require('../../../lib')
const { CircularArray } = require('../../../lib/circular-array')
const { Queue } = require('../../../lib/queue')
await pool.destroy()
})
+ it('Verify that pool info is set', async () => {
+ let pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ expect(pool.info).toStrictEqual({
+ type: PoolTypes.fixed,
+ minSize: numberOfWorkers,
+ maxSize: numberOfWorkers,
+ workerNodes: numberOfWorkers,
+ idleWorkerNodes: numberOfWorkers,
+ busyWorkerNodes: 0,
+ runningTasks: 0,
+ queuedTasks: 0,
+ maxQueuedTasks: 0
+ })
+ await pool.destroy()
+ pool = new DynamicClusterPool(
+ numberOfWorkers,
+ numberOfWorkers * 2,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ expect(pool.info).toStrictEqual({
+ type: PoolTypes.dynamic,
+ minSize: numberOfWorkers,
+ maxSize: numberOfWorkers * 2,
+ workerNodes: numberOfWorkers,
+ idleWorkerNodes: numberOfWorkers,
+ busyWorkerNodes: 0,
+ runningTasks: 0,
+ queuedTasks: 0,
+ maxQueuedTasks: 0
+ })
+ await pool.destroy()
+ })
+
it('Simulate worker not found', async () => {
const pool = new StubPoolWithRemoveAllWorker(
numberOfWorkers,
expect(workerNode.tasksUsage.run).toBe(0)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
- expect(queuePool.numberOfRunningTasks).toBe(numberOfWorkers)
- expect(queuePool.numberOfQueuedTasks).toBe(
+ expect(queuePool.info.runningTasks).toBe(numberOfWorkers)
+ expect(queuePool.info.queuedTasks).toBe(
+ numberOfWorkers * maxMultiplier - numberOfWorkers
+ )
+ expect(queuePool.info.maxQueuedTasks).toBe(
numberOfWorkers * maxMultiplier - numberOfWorkers
)
await Promise.all(promises)
expect(workerNode.tasksUsage.run).toBe(0)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
- expect(queuePool.numberOfRunningTasks).toBe(numberOfThreads)
- expect(queuePool.numberOfQueuedTasks).toBe(
+ expect(queuePool.info.runningTasks).toBe(numberOfThreads)
+ expect(queuePool.info.queuedTasks).toBe(
+ numberOfThreads * maxMultiplier - numberOfThreads
+ )
+ expect(queuePool.info.maxQueuedTasks).toBe(
numberOfThreads * maxMultiplier - numberOfThreads
)
await Promise.all(promises)