median
} from '../utils'
import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions } from './pool'
+import {
+ PoolEvents,
+ type IPool,
+ type PoolOptions,
+ type TasksQueueOptions,
+ PoolType
+} from './pool'
import { PoolEmitter } from './pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
import {
WorkerChoiceStrategies,
Worker extends IWorker,
Data = unknown,
Response = unknown
-> implements IPoolInternal<Worker, Data, Response> {
+> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+ if (this.opts.enableTasksQueue) {
+ if ((opts.tasksQueueOptions?.concurrency as number) <= 0) {
+ throw new Error(
+ `Invalid worker tasks concurrency '${
+ (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number
+ }'`
+ )
+ }
+ this.opts.tasksQueueOptions = {
+ concurrency: opts.tasksQueueOptions?.concurrency ?? 1
+ }
+ }
}
private checkValidWorkerChoiceStrategy (
)
}
- /** @inheritDoc */
- public abstract get full (): boolean
+ /**
+ * Whether the pool is full or not.
+ *
+ * The pool filling boolean status.
+ */
+ protected abstract get full (): boolean
- /** @inheritDoc */
- public abstract get busy (): boolean
+ /**
+ * Whether the pool is busy or not.
+ *
+ * The pool busyness boolean status.
+ */
+ protected abstract get busy (): boolean
protected internalBusy (): boolean {
return this.findFreeWorkerNodeKey() === -1
})
if (
this.opts.enableTasksQueue === true &&
- (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
+ (this.busy ||
+ this.workerNodes[workerNodeKey].tasksUsage.running >=
+ ((this.opts.tasksQueueOptions as TasksQueueOptions)
+ .concurrency as number))
) {
this.enqueueTask(workerNodeKey, submittedTask)
} else {