Properties:
- `medRunTime` (optional) - Use the tasks median run time instead of the tasks average run time in worker choice strategies.
- Default: { medRunTime: false }
+
+ Default: { medRunTime: false }
- `enableEvents` (optional) - Events emission enablement in this pool. Default: true
- `enableTasksQueue` (optional, experimental) - Tasks queue per worker enablement in this pool. Default: false
+- `tasksQueueOptions` (optional, experimental) - The worker tasks queue options object to use in this pool.
+ Properties:
+
+ - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker.
+
+ Default: { concurrency: 1 }
+
### `pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`
`min` (mandatory) Same as FixedThreadPool/FixedClusterPool numberOfThreads/numberOfWorkers, this number of workers will be always active
median
} from '../utils'
import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
-import { PoolEvents, type PoolOptions } from './pool'
+import { PoolEvents, type PoolOptions, type TasksQueueOptions } from './pool'
import { PoolEmitter } from './pool'
import type { IPoolInternal } from './pool-internal'
import { PoolType } from './pool-internal'
opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+ if (this.opts.enableTasksQueue) {
+ if ((opts.tasksQueueOptions?.concurrency as number) <= 0) {
+ throw new Error(
+ `Invalid tasks queue concurrency '${
+ (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number
+ }'`
+ )
+ }
+ this.opts.tasksQueueOptions = {
+ concurrency: opts.tasksQueueOptions?.concurrency ?? 1
+ }
+ }
}
private checkValidWorkerChoiceStrategy (
})
if (
this.opts.enableTasksQueue === true &&
- (this.busy || this.workerNodes[workerNodeKey].tasksUsage.running > 0)
+ (this.busy ||
+ this.workerNodes[workerNodeKey].tasksUsage.running >
+ ((this.opts.tasksQueueOptions as TasksQueueOptions)
+ .concurrency as number) -
+ 1)
) {
this.enqueueTask(workerNodeKey, submittedTask)
} else {
*/
export type PoolEvent = keyof typeof PoolEvents
+/**
+ * Worker tasks queue options.
+ */
+export interface TasksQueueOptions {
+ /**
+ * Maximum number of tasks that can be executed concurrently on a worker.
+ *
+ * @defaultValue 1
+ */
+ concurrency?: number
+}
+
/**
* Options for a poolifier pool.
*/
* @defaultValue false
*/
enableTasksQueue?: boolean
+ /**
+ * Pool worker tasks queue options.
+ *
+ * @experimental
+ * @defaultValue \{ concurrency: 1 \}
+ */
+ tasksQueueOptions?: TasksQueueOptions
}
/**