X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=99f65a24a0ee85f606ab7937653cf0b641472bc8;hb=243a550a93e278669fe5602aeba92dc8ba11260e;hp=fea4847d685c1c60433b55a321c83f30dee20abf;hpb=0527b6db895355140ed6fa2f740caab3a41b2f9f;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index fea4847d..99f65a24 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,8 +1,12 @@ import crypto from 'node:crypto' import type { MessageValue, PromiseResponseWrapper } from '../utility-types' -import { EMPTY_FUNCTION, median } from '../utils' +import { + DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + EMPTY_FUNCTION, + median +} from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' -import { PoolEvents, type PoolOptions } from './pool' +import { PoolEvents, type PoolOptions, type TasksQueueOptions } from './pool' import { PoolEmitter } from './pool' import type { IPoolInternal } from './pool-internal' import { PoolType } from './pool-internal' @@ -132,9 +136,21 @@ export abstract class AbstractPool< opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? { medRunTime: false } + opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false + if (this.opts.enableTasksQueue) { + if ((opts.tasksQueueOptions?.concurrency as number) <= 0) { + throw new Error( + `Invalid worker tasks concurrency '${ + (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number + }'` + ) + } + this.opts.tasksQueueOptions = { + concurrency: opts.tasksQueueOptions?.concurrency ?? 1 + } + } } private checkValidWorkerChoiceStrategy ( @@ -241,7 +257,11 @@ export abstract class AbstractPool< }) if ( this.opts.enableTasksQueue === true && - (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0) + (this.busy || + this.workerNodes[workerNodeKey].tasksUsage.running > + ((this.opts.tasksQueueOptions as TasksQueueOptions) + .concurrency as number) - + 1) ) { this.enqueueTask(workerNodeKey, submittedTask) } else { @@ -544,8 +564,8 @@ export abstract class AbstractPool< this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) } - private enqueueTask (workerNodeKey: number, task: Task): void { - this.workerNodes[workerNodeKey].tasksQueue.push(task) + private enqueueTask (workerNodeKey: number, task: Task): number { + return this.workerNodes[workerNodeKey].tasksQueue.push(task) } private dequeueTask (workerNodeKey: number): Task | undefined {