X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fworker-node.ts;h=e28372efa1b3c73cf9c8904d6077f87fed1dd553;hb=68cbdc846878bc058323b757a68b4c83eedc6388;hp=e79aed0bc3aa68b880d0d265bca4fb2ecf8a9bff;hpb=4b280f8150c7daa56c62731eb9441b888a45dad4;p=poolifier.git diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index e79aed0b..e28372ef 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,7 +1,12 @@ import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array' import type { Task } from '../utility-types' -import { DEFAULT_TASK_NAME } from '../utils' +import { + DEFAULT_TASK_NAME, + EMPTY_FUNCTION, + exponentialDelay, + sleep +} from '../utils' import { Deque } from '../deque' import { type IWorker, @@ -36,6 +41,7 @@ implements IWorkerNode { public onEmptyQueue?: (workerId: number) => void private readonly taskFunctionsUsage: Map private readonly tasksQueue: Deque> + private onEmptyQueueCount: number /** * Constructs a new worker node. @@ -76,6 +82,7 @@ implements IWorkerNode { this.taskFunctionsUsage = new Map() this.tasksQueue = new Deque>() this.tasksQueueBackPressureSize = tasksQueueBackPressureSize + this.onEmptyQueueCount = 0 } /** @inheritdoc */ @@ -105,7 +112,7 @@ implements IWorkerNode { public dequeueTask (): Task | undefined { const task = this.tasksQueue.shift() if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { - this.onEmptyQueue(this.info.id as number) + this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task } @@ -114,7 +121,7 @@ implements IWorkerNode { public popTask (): Task | undefined { const task = this.tasksQueue.pop() if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { - this.onEmptyQueue(this.info.id as number) + this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task } @@ -170,6 +177,19 @@ implements IWorkerNode { return this.taskFunctionsUsage.get(name) } + private async startOnEmptyQueue (): Promise { + if (this.onEmptyQueue != null) { + if (this.tasksQueue.size > 0) { + this.onEmptyQueueCount = 0 + return + } + this.onEmptyQueue(this.info.id as number) + ++this.onEmptyQueueCount + await sleep(exponentialDelay(this.onEmptyQueueCount)) + await this.startOnEmptyQueue() + } + } + private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo { return { id: this.getWorkerId(worker, workerType), @@ -196,6 +216,7 @@ implements IWorkerNode { get maxQueued (): number { return getTasksQueueMaxSize() }, + stolen: 0, failed: 0 }, runTime: { @@ -236,6 +257,7 @@ implements IWorkerNode { get queued (): number { return getTaskFunctionQueueSize() }, + stolen: 0, failed: 0 }, runTime: {