feat: introduce worker node queue back pressure detection
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 17 Aug 2023 22:47:36 +0000 (00:47 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 17 Aug 2023 22:47:36 +0000 (00:47 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker-node.ts
src/pools/worker.ts

index cba92d11f4ed684e8043bb43800cf7351b35a9c0..eb2c2f7818a687ac8f8016eb27489895cadfaaee 100644 (file)
@@ -1212,7 +1212,11 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
    */
   private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    const workerNode = new WorkerNode<Worker, Data>(
+      worker,
+      this.worker,
+      this.maxSize
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
@@ -1250,6 +1254,15 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+    if (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    ) {
+      this.emitter?.emit(PoolEvents.backPressure, {
+        workerId: this.getWorkerInfo(workerNodeKey).id,
+        ...this.info
+      })
+    }
     return this.workerNodes[workerNodeKey].enqueueTask(task)
   }
 
index f4d9c7bba67d18d3c48a212159bf606fe409cfcf..26ca4c03279ad9969df3940141cf0bade3b815b4 100644 (file)
@@ -47,7 +47,8 @@ export const PoolEvents = Object.freeze({
   full: 'full',
   destroy: 'destroy',
   error: 'error',
-  taskError: 'taskError'
+  taskError: 'taskError',
+  backPressure: 'backPressure'
 } as const)
 
 /**
index a5482dcd29f6778b69c70bf322c2370be13918b3..64880fcb9820c098e37fb103bd27b8563073833a 100644 (file)
@@ -20,20 +20,26 @@ import {
  */
 export class WorkerNode<Worker extends IWorker, Data = unknown>
 implements IWorkerNode<Worker, Data> {
+  /** @inheritdoc */
   public readonly worker: Worker
+  /** @inheritdoc */
   public readonly info: WorkerInfo
+  /** @inheritdoc */
   public messageChannel?: MessageChannel
+  /** @inheritdoc */
   public usage: WorkerUsage
   private readonly tasksUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Queue<Task<Data>>
+  private readonly tasksQueueBackPressureMaxSize: number
 
   /**
    * Constructs a new worker node.
    *
    * @param worker - The worker.
    * @param workerType - The worker type.
+   * @param poolMaxSize - The pool maximum size.
    */
-  constructor (worker: Worker, workerType: WorkerType) {
+  constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
     this.worker = worker
     this.info = this.initWorkerInfo(worker, workerType)
     if (workerType === WorkerTypes.thread) {
@@ -42,6 +48,7 @@ implements IWorkerNode<Worker, Data> {
     this.usage = this.initWorkerUsage()
     this.tasksUsage = new Map<string, WorkerUsage>()
     this.tasksQueue = new Queue<Task<Data>>()
+    this.tasksQueueBackPressureMaxSize = Math.pow(poolMaxSize, 2)
   }
 
   /** @inheritdoc */
@@ -73,6 +80,11 @@ implements IWorkerNode<Worker, Data> {
     this.tasksQueue.clear()
   }
 
+  /** @inheritdoc */
+  public hasBackPressure (): boolean {
+    return this.tasksQueueSize() >= this.tasksQueueBackPressureMaxSize
+  }
+
   /** @inheritdoc */
   public resetUsage (): void {
     this.usage = this.initWorkerUsage()
index 09253242f7d0af88204ba9989fb37a7fa12cc287..cd395bbdd0301948937bc7c1ba06d3f2f68d540f 100644 (file)
@@ -2,6 +2,11 @@ import type { MessageChannel } from 'node:worker_threads'
 import type { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
 
+/**
+ * Callback invoked when the worker has started successfully.
+ */
+export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
+
 /**
  * Callback invoked if the worker has received a message.
  */
@@ -18,11 +23,6 @@ export type ErrorHandler<Worker extends IWorker> = (
   error: Error
 ) => void
 
-/**
- * Callback invoked when the worker has started successfully.
- */
-export type OnlineHandler<Worker extends IWorker> = (this: Worker) => void
-
 /**
  * Callback invoked when the worker exits successfully.
  */
@@ -242,6 +242,12 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * Clears tasks queue.
    */
   readonly clearTasksQueue: () => void
+  /**
+   * Whether the worker node has back pressure.
+   *
+   * @returns `true` if the worker node has back pressure, `false` otherwise.
+   */
+  readonly hasBackPressure: () => boolean
   /**
    * Resets usage statistics .
    */