* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
*/
private addWorkerNode (worker: Worker): number {
- const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+ const workerNode = new WorkerNode<Worker, Data>(
+ worker,
+ this.worker,
+ this.maxSize
+ )
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].hasBackPressure()
+ ) {
+ this.emitter?.emit(PoolEvents.backPressure, {
+ workerId: this.getWorkerInfo(workerNodeKey).id,
+ ...this.info
+ })
+ }
return this.workerNodes[workerNodeKey].enqueueTask(task)
}
full: 'full',
destroy: 'destroy',
error: 'error',
- taskError: 'taskError'
+ taskError: 'taskError',
+ backPressure: 'backPressure'
} as const)
/**
*/
export class WorkerNode<Worker extends IWorker, Data = unknown>
implements IWorkerNode<Worker, Data> {
+ /** @inheritdoc */
public readonly worker: Worker
+ /** @inheritdoc */
public readonly info: WorkerInfo
+ /** @inheritdoc */
public messageChannel?: MessageChannel
+ /** @inheritdoc */
public usage: WorkerUsage
private readonly tasksUsage: Map<string, WorkerUsage>
private readonly tasksQueue: Queue<Task<Data>>
+ private readonly tasksQueueBackPressureMaxSize: number
/**
* Constructs a new worker node.
*
* @param worker - The worker.
* @param workerType - The worker type.
+ * @param poolMaxSize - The pool maximum size.
*/
- constructor (worker: Worker, workerType: WorkerType) {
+ constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
this.worker = worker
this.info = this.initWorkerInfo(worker, workerType)
if (workerType === WorkerTypes.thread) {
this.usage = this.initWorkerUsage()
this.tasksUsage = new Map<string, WorkerUsage>()
this.tasksQueue = new Queue<Task<Data>>()
+ this.tasksQueueBackPressureMaxSize = Math.pow(poolMaxSize, 2)
}
/** @inheritdoc */
this.tasksQueue.clear()
}
+ /** @inheritdoc */
+ public hasBackPressure (): boolean {
+ return this.tasksQueueSize() >= this.tasksQueueBackPressureMaxSize
+ }
+
/** @inheritdoc */
public resetUsage (): void {
this.usage = this.initWorkerUsage()
import type { CircularArray } from '../circular-array'
import type { Task } from '../utility-types'
+/**
+ * Callback invoked when the worker has started successfully.
+ */
+export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
+
/**
* Callback invoked if the worker has received a message.
*/
error: Error
) => void
-/**
- * Callback invoked when the worker has started successfully.
- */
-export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
-
/**
* Callback invoked when the worker exits successfully.
*/
* Clears tasks queue.
*/
readonly clearTasksQueue: () => void
+ /**
+ * Whether the worker node has back pressure.
+ *
+ * @returns `true` if the worker node has back pressure, `false` otherwise.
+ */
+ readonly hasBackPressure: () => boolean
/**
* Resets usage statistics .
*/