From 671d515455c745dc74f4c385fe23683975bfc3df Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 18 Aug 2023 00:47:36 +0200 Subject: [PATCH] feat: introduce worker node queue back pressure detection MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 15 ++++++++++++++- src/pools/pool.ts | 3 ++- src/pools/worker-node.ts | 14 +++++++++++++- src/pools/worker.ts | 16 +++++++++++----- 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index cba92d11..eb2c2f78 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1212,7 +1212,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.maxSize + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true @@ -1250,6 +1254,15 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + this.emitter?.emit(PoolEvents.backPressure, { + workerId: this.getWorkerInfo(workerNodeKey).id, + ...this.info + }) + } return this.workerNodes[workerNodeKey].enqueueTask(task) } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index f4d9c7bb..26ca4c03 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -47,7 +47,8 @@ export const PoolEvents = Object.freeze({ full: 'full', destroy: 'destroy', error: 'error', - taskError: 'taskError' + taskError: 'taskError', + backPressure: 'backPressure' } as const) /** diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index a5482dcd..64880fcb 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -20,20 +20,26 @@ import { */ export class WorkerNode implements IWorkerNode { + /** @inheritdoc */ public readonly worker: Worker + /** @inheritdoc */ public readonly info: WorkerInfo + /** @inheritdoc */ public messageChannel?: MessageChannel + /** @inheritdoc */ public usage: WorkerUsage private readonly tasksUsage: Map private readonly tasksQueue: Queue> + private readonly tasksQueueBackPressureMaxSize: number /** * Constructs a new worker node. * * @param worker - The worker. * @param workerType - The worker type. + * @param poolMaxSize - The pool maximum size. */ - constructor (worker: Worker, workerType: WorkerType) { + constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) { this.worker = worker this.info = this.initWorkerInfo(worker, workerType) if (workerType === WorkerTypes.thread) { @@ -42,6 +48,7 @@ implements IWorkerNode { this.usage = this.initWorkerUsage() this.tasksUsage = new Map() this.tasksQueue = new Queue>() + this.tasksQueueBackPressureMaxSize = Math.pow(poolMaxSize, 2) } /** @inheritdoc */ @@ -73,6 +80,11 @@ implements IWorkerNode { this.tasksQueue.clear() } + /** @inheritdoc */ + public hasBackPressure (): boolean { + return this.tasksQueueSize() >= this.tasksQueueBackPressureMaxSize + } + /** @inheritdoc */ public resetUsage (): void { this.usage = this.initWorkerUsage() diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 09253242..cd395bbd 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -2,6 +2,11 @@ import type { MessageChannel } from 'node:worker_threads' import type { CircularArray } from '../circular-array' import type { Task } from '../utility-types' +/** + * Callback invoked when the worker has started successfully. + */ +export type OnlineHandler = (this: Worker) => void + /** * Callback invoked if the worker has received a message. */ @@ -18,11 +23,6 @@ export type ErrorHandler = ( error: Error ) => void -/** - * Callback invoked when the worker has started successfully. - */ -export type OnlineHandler = (this: Worker) => void - /** * Callback invoked when the worker exits successfully. */ @@ -242,6 +242,12 @@ export interface IWorkerNode { * Clears tasks queue. */ readonly clearTasksQueue: () => void + /** + * Whether the worker node has back pressure. + * + * @returns `true` if the worker node has back pressure, `false` otherwise. + */ + readonly hasBackPressure: () => boolean /** * Resets usage statistics . */ -- 2.34.1