X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fworker-node.ts;h=116fb8448d8435ecb98c5573bb781f258026e589;hb=255d48dfda3aae693d228d0c1caea6f9ab296b8b;hp=359dc26c1c3649380572c7d6609961b534225863;hpb=36f8b943edd527d5111e783b6dce3c4b3f9a53b3;p=poolifier.git diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 359dc26c..116fb844 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,7 +1,7 @@ import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array' import type { Task } from '../utility-types' -import { DEFAULT_TASK_NAME } from '../utils' +import { DEFAULT_TASK_NAME, once } from '../utils' import { Deque } from '../deque' import { type IWorker, @@ -32,6 +32,8 @@ implements IWorkerNode { public tasksQueueBackPressureSize: number /** @inheritdoc */ public onBackPressure?: (workerId: number) => void + /** @inheritdoc */ + public onEmptyQueue?: (workerId: number) => void private readonly taskFunctionsUsage: Map private readonly tasksQueue: Deque> @@ -81,20 +83,11 @@ implements IWorkerNode { return this.tasksQueue.size } - /** - * Tasks queue maximum size. - * - * @returns The tasks queue maximum size. - */ - private tasksQueueMaxSize (): number { - return this.tasksQueue.maxSize - } - /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.push(task) if (this.onBackPressure != null && this.hasBackPressure()) { - this.once(this.onBackPressure)(this.info.id as number) + once(this.onBackPressure, this)(this.info.id as number) } return tasksQueueSize } @@ -103,19 +96,27 @@ implements IWorkerNode { public unshiftTask (task: Task): number { const tasksQueueSize = this.tasksQueue.unshift(task) if (this.onBackPressure != null && this.hasBackPressure()) { - this.once(this.onBackPressure)(this.info.id as number) + once(this.onBackPressure, this)(this.info.id as number) } return tasksQueueSize } /** @inheritdoc */ public dequeueTask (): Task | undefined { - return this.tasksQueue.shift() + const task = this.tasksQueue.shift() + if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + once(this.onEmptyQueue, this)(this.info.id as number) + } + return task } /** @inheritdoc */ public popTask (): Task | undefined { - return this.tasksQueue.pop() + const task = this.tasksQueue.pop() + if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + once(this.onEmptyQueue, this)(this.info.id as number) + } + return task } /** @inheritdoc */ @@ -180,10 +181,10 @@ implements IWorkerNode { private initWorkerUsage (): WorkerUsage { const getTasksQueueSize = (): number => { - return this.tasksQueueSize() + return this.tasksQueue.size } const getTasksQueueMaxSize = (): number => { - return this.tasksQueueMaxSize() + return this.tasksQueue.maxSize } return { tasks: { @@ -271,28 +272,4 @@ implements IWorkerNode { return worker.id } } - - /** - * Executes a function once at a time. - * - * @param fn - The function to execute. - * @param context - The context to bind the function to. - * @returns The function to execute. - */ - private once ( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - fn: (...args: any[]) => void, - context = this - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ): (...args: any[]) => void { - let called = false - // eslint-disable-next-line @typescript-eslint/no-explicit-any - return function (...args: any[]): void { - if (!called) { - called = true - fn.apply(context, args) - called = false - } - } - } }