median
} from '../utils'
import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions, type TasksQueueOptions } from './pool'
+import {
+ PoolEvents,
+ type IPool,
+ type PoolOptions,
+ type TasksQueueOptions,
+ PoolType
+} from './pool'
import { PoolEmitter } from './pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
import {
WorkerChoiceStrategies,
- type WorkerChoiceStrategy
+ type WorkerChoiceStrategy,
+ type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
import { CircularArray } from '../circular-array'
*
* @typeParam Worker - Type of worker which manages this pool.
* @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of response of execution. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
*/
export abstract class AbstractPool<
Worker extends IWorker,
Data = unknown,
Response = unknown
-> implements IPoolInternal<Worker, Data, Response> {
+> implements IPool<Worker, Data, Response> {
/** @inheritDoc */
public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
* Constructs a new poolifier pool.
*
* @param numberOfWorkers - Number of workers that this pool should manage.
- * @param filePath - Path to the worker-file.
+ * @param filePath - Path to the worker file.
* @param opts - Options for the pool.
*/
public constructor (
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- if ((opts.tasksQueueOptions?.concurrency as number) <= 0) {
- throw new Error(
- `Invalid worker tasks concurrency '${
- (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number
- }'`
- )
- }
- this.opts.tasksQueueOptions = {
- concurrency: opts.tasksQueueOptions?.concurrency ?? 1
- }
+ this.checkValidTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
+ this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
}
}
}
}
+ private checkValidTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions
+ ): void {
+ if ((tasksQueueOptions?.concurrency as number) <= 0) {
+ throw new Error(
+ `Invalid worker tasks concurrency '${
+ tasksQueueOptions.concurrency as number
+ }'`
+ )
+ }
+ }
+
/** @inheritDoc */
public abstract get type (): PoolType
/** @inheritDoc */
public setWorkerChoiceStrategy (
- workerChoiceStrategy: WorkerChoiceStrategy
+ workerChoiceStrategy: WorkerChoiceStrategy,
+ workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
})
}
this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
- workerChoiceStrategy
+ this.opts.workerChoiceStrategy
+ )
+ if (workerChoiceStrategyOptions != null) {
+ this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
+ }
+ }
+
+ /** @inheritDoc */
+ public setWorkerChoiceStrategyOptions (
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ ): void {
+ this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.workerChoiceStrategyContext.setOptions(
+ this.opts.workerChoiceStrategyOptions
)
}
/** @inheritDoc */
- public abstract get full (): boolean
+ public enableTasksQueue (
+ enable: boolean,
+ tasksQueueOptions?: TasksQueueOptions
+ ): void {
+ if (this.opts.enableTasksQueue === true && !enable) {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.flushTasksQueue(workerNodeKey)
+ }
+ }
+ this.opts.enableTasksQueue = enable
+ this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
+ }
/** @inheritDoc */
- public abstract get busy (): boolean
+ public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
+ if (this.opts.enableTasksQueue === true) {
+ this.checkValidTasksQueueOptions(tasksQueueOptions)
+ this.opts.tasksQueueOptions =
+ this.buildTasksQueueOptions(tasksQueueOptions)
+ } else {
+ delete this.opts.tasksQueueOptions
+ }
+ }
+
+ private buildTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions
+ ): TasksQueueOptions {
+ return {
+ concurrency: tasksQueueOptions?.concurrency ?? 1
+ }
+ }
+
+ /**
+ * Whether the pool is full or not.
+ *
+ * The pool filling boolean status.
+ */
+ protected abstract get full (): boolean
+
+ /**
+ * Whether the pool is busy or not.
+ *
+ * The pool busyness boolean status.
+ */
+ protected abstract get busy (): boolean
protected internalBusy (): boolean {
return this.findFreeWorkerNodeKey() === -1
id: crypto.randomUUID()
}
const res = new Promise<Response>((resolve, reject) => {
- this.promiseResponseMap.set(submittedTask.id, {
+ this.promiseResponseMap.set(submittedTask.id as string, {
resolve,
reject,
worker: workerNode.worker
if (
this.opts.enableTasksQueue === true &&
(this.busy ||
- this.workerNodes[workerNodeKey].tasksUsage.running >
+ this.workerNodes[workerNodeKey].tasksUsage.running >=
((this.opts.tasksQueueOptions as TasksQueueOptions)
- .concurrency as number) -
- 1)
+ .concurrency as number))
) {
this.enqueueTask(workerNodeKey, submittedTask)
} else {
* Gets the given worker its tasks usage in the pool.
*
* @param worker - The worker.
+ * @throws Error if the worker is not found in the pool worker nodes.
* @returns The worker tasks usage.
*/
private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {