## [Unreleased]
+### Added
+
+- Add `queueMaxSize` option to tasks queue options.
+
## [2.6.31] - 2023-08-20
### Fixed
- `tasksQueueOptions` (optional) - The worker tasks queue options object to use in this pool.
Properties:
+ - `queueMaxSize` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
- `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
- Default: `{ concurrency: 1 }`
+ Default: `{ queueMaxSize: (pool maximum size)^2, concurrency: 1 }`
#### `ThreadPoolOptions extends PoolOptions`
!Number.isSafeInteger(tasksQueueOptions.concurrency)
) {
throw new TypeError(
- 'Invalid worker tasks concurrency: must be an integer'
+ 'Invalid worker node tasks concurrency: must be an integer'
)
}
if (
tasksQueueOptions.concurrency <= 0
) {
throw new RangeError(
- `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+ `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
+ )
+ }
+ if (
+ tasksQueueOptions?.queueMaxSize != null &&
+ !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
+ ) {
+ throw new TypeError(
+ 'Invalid worker node tasks queue max size: must be an integer'
+ )
+ }
+ if (
+ tasksQueueOptions?.queueMaxSize != null &&
+ tasksQueueOptions.queueMaxSize <= 0
+ ) {
+ throw new RangeError(
+ `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
)
}
}
this.checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
+ this.setTasksQueueMaxSize(
+ this.opts.tasksQueueOptions.queueMaxSize as number
+ )
} else if (this.opts.tasksQueueOptions != null) {
delete this.opts.tasksQueueOptions
}
}
+ private setTasksQueueMaxSize (queueMaxSize: number): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.tasksQueueBackPressureSize = queueMaxSize
+ }
+ }
+
private buildTasksQueueOptions (
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- concurrency: tasksQueueOptions?.concurrency ?? 1
+ ...{
+ queueMaxSize: Math.pow(this.maxSize, 2),
+ concurrency: 1
+ },
+ ...tasksQueueOptions
}
}
const workerNode = new WorkerNode<Worker, Data>(
worker,
this.worker,
- this.maxSize
+ this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
)
// Flag the worker node as ready at pool startup.
if (this.starting) {
}
/**
- * Worker tasks queue options.
+ * Worker node tasks queue options.
*/
export interface TasksQueueOptions {
/**
- * Maximum number of tasks that can be executed concurrently on a worker.
+ * Maximum tasks queue size per worker node flagging it as back pressured.
+ *
+ * @defaultValue (pool maximum size)^2
+ */
+ readonly queueMaxSize?: number
+ /**
+ * Maximum number of tasks that can be executed concurrently on a worker node.
*
* @defaultValue 1
*/
*/
enableEvents?: boolean
/**
- * Pool worker tasks queue.
+ * Pool worker node tasks queue.
*
* @defaultValue false
*/
enableTasksQueue?: boolean
/**
- * Pool worker tasks queue options.
+ * Pool worker node tasks queue options.
*/
tasksQueueOptions?: TasksQueueOptions
}
* - '`destroy`': Emitted when the pool is destroyed.
* - `'error'`: Emitted when an uncaught error occurs.
* - `'taskError'`: Emitted when an error occurs while executing a task.
- * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= pool maximum size^2).
+ * - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= max queue size).
*/
readonly emitter?: PoolEmitter
/**
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
) => void
/**
- * Enables/disables the worker tasks queue in this pool.
+ * Enables/disables the worker node tasks queue in this pool.
*
- * @param enable - Whether to enable or disable the worker tasks queue.
- * @param tasksQueueOptions - The worker tasks queue options.
+ * @param enable - Whether to enable or disable the worker node tasks queue.
+ * @param tasksQueueOptions - The worker node tasks queue options.
*/
readonly enableTasksQueue: (
enable: boolean,
tasksQueueOptions?: TasksQueueOptions
) => void
/**
- * Sets the worker tasks queue options in this pool.
+ * Sets the worker node tasks queue options in this pool.
*
- * @param tasksQueueOptions - The worker tasks queue options.
+ * @param tasksQueueOptions - The worker node tasks queue options.
*/
readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
}
public messageChannel?: MessageChannel
/** @inheritdoc */
public usage: WorkerUsage
+ /** @inheritdoc */
+ public tasksQueueBackPressureSize: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
private readonly tasksQueue: Queue<Task<Data>>
- private readonly tasksQueueBackPressureSize: number
/**
* Constructs a new worker node.
*
* @param worker - The worker.
* @param workerType - The worker type.
- * @param poolMaxSize - The pool maximum size.
+ * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
*/
- constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
+ constructor (
+ worker: Worker,
+ workerType: WorkerType,
+ tasksQueueBackPressureSize: number
+ ) {
if (worker == null) {
throw new TypeError('Cannot construct a worker node without a worker')
}
'Cannot construct a worker node without a worker type'
)
}
- if (poolMaxSize == null) {
+ if (tasksQueueBackPressureSize == null) {
throw new TypeError(
- 'Cannot construct a worker node without a pool maximum size'
+ 'Cannot construct a worker node without a tasks queue back pressure size'
)
}
- if (!Number.isSafeInteger(poolMaxSize)) {
+ if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
throw new TypeError(
- 'Cannot construct a worker node with a pool maximum size that is not an integer'
+ 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
)
}
this.worker = worker
this.usage = this.initWorkerUsage()
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
this.tasksQueue = new Queue<Task<Data>>()
- this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
+ this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
}
/** @inheritdoc */
* Worker usage statistics.
*/
usage: WorkerUsage
+ /**
+ * Tasks queue back pressure size.
+ * This is the number of tasks that can be enqueued before the worker node has back pressure.
+ */
+ tasksQueueBackPressureSize: number
/**
* Tasks queue size.
*
expect(pool.opts.enableEvents).toBe(false)
expect(pool.opts.restartWorkerOnError).toBe(false)
expect(pool.opts.enableTasksQueue).toBe(true)
- expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({
+ concurrency: 2,
+ queueMaxSize: 4
+ })
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.LEAST_USED
)
)
).toThrowError(
new RangeError(
- 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
+ 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
)
)
expect(
}
)
).toThrowError(
- new TypeError('Invalid worker tasks concurrency: must be an integer')
+ new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
})
expect(pool.opts.tasksQueueOptions).toBeUndefined()
pool.enableTasksQueue(true)
expect(pool.opts.enableTasksQueue).toBe(true)
- expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({
+ concurrency: 1,
+ queueMaxSize: 4
+ })
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
- expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({
+ concurrency: 2,
+ queueMaxSize: 4
+ })
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
expect(pool.opts.tasksQueueOptions).toBeUndefined()
'./tests/worker-files/thread/testWorker.js',
{ enableTasksQueue: true }
)
- expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({
+ concurrency: 1,
+ queueMaxSize: 4
+ })
pool.setTasksQueueOptions({ concurrency: 2 })
- expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({
+ concurrency: 2,
+ queueMaxSize: 4
+ })
expect(() =>
pool.setTasksQueueOptions('invalidTasksQueueOptions')
).toThrowError(
)
expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
new RangeError(
- 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
+ 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
)
)
expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
new RangeError(
- 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
+ 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
)
)
expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
- new TypeError('Invalid worker tasks concurrency: must be an integer')
+ new TypeError('Invalid worker node tasks concurrency: must be an integer')
+ )
+ expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0 })).toThrowError(
+ new RangeError(
+ 'Invalid worker node tasks queue max size: 0 is a negative integer or zero'
+ )
+ )
+ expect(() => pool.setTasksQueueOptions({ queueMaxSize: -1 })).toThrowError(
+ new RangeError(
+ 'Invalid worker node tasks queue max size: -1 is a negative integer or zero'
+ )
+ )
+ expect(() => pool.setTasksQueueOptions({ queueMaxSize: 0.2 })).toThrowError(
+ new TypeError(
+ 'Invalid worker node tasks queue max size: must be an integer'
+ )
)
await pool.destroy()
})