type IPool,
PoolEmitter,
PoolEvents,
+ type PoolInfo,
type PoolOptions,
- PoolType,
- type TasksQueueOptions
+ type PoolType,
+ PoolTypes,
+ type TasksQueueOptions,
+ type WorkerType
} from './pool'
import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
import {
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
+ } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
throw new Error('Cannot instantiate a fixed pool with no worker')
}
}
this.checkValidWorkerChoiceStrategyOptions(
this.opts.workerChoiceStrategyOptions
)
+ this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
}
if (
workerChoiceStrategyOptions.weights != null &&
- Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
+ Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
) {
throw new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
public abstract get type (): PoolType
/** @inheritDoc */
- public abstract get size (): number
+ public get info (): PoolInfo {
+ return {
+ type: this.type,
+ worker: this.worker,
+ minSize: this.minSize,
+ maxSize: this.maxSize,
+ workerNodes: this.workerNodes.length,
+ idleWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
+ 0
+ ),
+ busyWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
+ 0
+ ),
+ runningTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.tasksUsage.running,
+ 0
+ ),
+ queuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
+ 0
+ ),
+ maxQueuedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.tasksQueue.maxSize,
+ 0
+ )
+ }
+ }
/**
- * Number of tasks running in the pool.
+ * Gets the worker type.
*/
- private get numberOfRunningTasks (): number {
- return this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running,
- 0
- )
- }
+ protected abstract get worker (): WorkerType
/**
- * Number of tasks queued in the pool.
+ * Pool minimum size.
*/
- private get numberOfQueuedTasks (): number {
- if (this.opts.enableTasksQueue === false) {
- return 0
- }
- return this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
- 0
- )
- }
+ protected abstract get minSize (): number
+
+ /**
+ * Pool maximum size.
+ */
+ protected abstract get maxSize (): number
/**
* Gets the given worker its worker node key.
++workerTasksUsage.error
}
this.updateRunTimeTasksUsage(workerTasksUsage, message)
- this.updateWaitTasksUsage(workerTasksUsage, message)
+ this.updateWaitTimeTasksUsage(workerTasksUsage, message)
}
private updateRunTimeTasksUsage (
}
}
- private updateWaitTasksUsage (
+ private updateWaitTimeTasksUsage (
workerTasksUsage: TasksUsage,
message: MessageValue<Response>
): void {
*/
protected chooseWorkerNode (): number {
let workerNodeKey: number
- if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
+ if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
const workerCreated = this.createAndSetupWorker()
this.registerWorkerMessageListener(workerCreated, message => {
const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+ worker.on('error', error => {
+ if (this.emitter != null) {
+ this.emitter.emit(PoolEvents.error, error)
+ }
+ })
+ if (this.opts.restartWorkerOnError === true) {
+ worker.on('error', () => {
+ this.createAndSetupWorker()
+ })
+ }
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => {
if (promiseResponse != null) {
if (message.error != null) {
promiseResponse.reject(message.error)
+ if (this.emitter != null) {
+ this.emitter.emit(PoolEvents.taskError, {
+ error: message.error,
+ errorData: message.errorData
+ })
+ }
} else {
promiseResponse.resolve(message.data as Response)
}
}
private checkAndEmitEvents (): void {
- if (this.opts.enableEvents === true) {
+ if (this.emitter != null) {
if (this.busy) {
- this.emitter?.emit(PoolEvents.busy)
+ this.emitter?.emit(PoolEvents.busy, this.info)
}
- if (this.type === PoolType.DYNAMIC && this.full) {
- this.emitter?.emit(PoolEvents.full)
+ if (this.type === PoolTypes.dynamic && this.full) {
+ this.emitter?.emit(PoolEvents.full, this.info)
}
}
}
*/
private removeWorkerNode (worker: Worker): void {
const workerNodeKey = this.getWorkerNodeKey(worker)
- this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext.remove(workerNodeKey)
+ if (workerNodeKey !== -1) {
+ this.workerNodes.splice(workerNodeKey, 1)
+ this.workerChoiceStrategyContext.remove(workerNodeKey)
+ }
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {