## [Unreleased]
+### Added
+
+- Add tasks queue enablement runtime setter to pool.
+- Add tasks queue options runtime setter to pool.
+- Add worker choice strategy options runtime setter to pool.
+
+### Changed
+
+- Remove the tasks queuing experimental status.
+
### Fixed
- Fix worker function type definition and validation.
"eslint": "^8.38.0",
"eslint-config-standard": "^17.0.0",
"eslint-config-standard-with-typescript": "^34.0.1",
- "eslint-define-config": "^1.17.0",
+ "eslint-define-config": "^1.18.0",
"eslint-import-resolver-typescript": "^3.5.5",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-jsdoc": "^41.1.1",
specifier: ^34.0.1
version: 34.0.1(@typescript-eslint/eslint-plugin@5.58.0)(eslint-plugin-import@2.27.5)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.38.0)(typescript@5.0.4)
eslint-define-config:
- specifier: ^1.17.0
- version: 1.17.0
+ specifier: ^1.18.0
+ version: 1.18.0
eslint-import-resolver-typescript:
specifier: ^3.5.5
version: 3.5.5(@typescript-eslint/parser@5.58.0)(eslint-plugin-import@2.27.5)(eslint@8.38.0)
optional: true
dependencies:
'@rollup/pluginutils': 5.0.2(rollup@3.20.2)
- resolve: 1.22.2
+ resolve: 1.22.3
rollup: 3.20.2
typescript: 5.0.4
dev: true
resolution: {integrity: sha512-3tlv/dIP7FWvj3BsbHrGLJ6l/oKh1O3TcgBqMn+yyCagOxc23fyzDS6HypQbgxWbkpDnf52p1LuR4eWDQ/K9WQ==}
dev: true
- /commander@10.0.0:
- resolution: {integrity: sha512-zS5PnTI22FIRM6ylNW8G4Ap0IEOyk62fhLSD0+uHRT9McRCLGpkVNvao4bjimpK/GShynyQkFFxHhwMcETmduA==}
+ /commander@10.0.1:
+ resolution: {integrity: sha512-y4Mg2tXshplEbSGzx7amzPwKKOCGuoSRP/CjEdwwk0FOGlUbq6lKuoyDZTNZkmxHdJtp54hdfY/JUrdL7Xfdug==}
engines: {node: '>=14'}
dev: true
eslint-plugin-promise: 6.1.1(eslint@8.38.0)
dev: true
- /eslint-define-config@1.17.0:
- resolution: {integrity: sha512-J1sweMoWsLcokaiAlfOCC4yMoHbvC/kDAxorm5TkUcD74w+kauMIyjKLM3dOadNxVKOjDiYN1Tu2x9N+4EUuuQ==}
+ /eslint-define-config@1.18.0:
+ resolution: {integrity: sha512-8qWT7aNU5M0W+WfoUixVaR79sqt3b280CK4bNPCkqXlTWUOYlEy3yEcXZFduvWawkNjuYWpZ2UjcBfvfnvGpvA==}
engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0, npm: '>=6.14.13', pnpm: '>= 7.0.0'}
dev: true
dependencies:
debug: 3.2.7
is-core-module: 2.12.0
- resolve: 1.22.2
+ resolve: 1.22.3
transitivePeerDependencies:
- supports-color
dev: true
debug: 4.3.4(supports-color@8.1.1)
enhanced-resolve: 5.12.0
eslint: 8.38.0
- eslint-module-utils: 2.7.4(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0)
+ eslint-module-utils: 2.8.0(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0)
eslint-plugin-import: 2.27.5(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0)
get-tsconfig: 4.5.0
globby: 13.1.4
- supports-color
dev: true
- /eslint-module-utils@2.7.4(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0):
- resolution: {integrity: sha512-j4GT+rqzCoRKHwURX7pddtIPGySnX9Si/cgMI5ztrcqOPtk5dDEeZ34CQVPphnqkJytlc97Vuk05Um2mJ3gEQA==}
+ /eslint-module-utils@2.8.0(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0):
+ resolution: {integrity: sha512-aWajIYfsqCKRDgUfjEXNN/JlrzauMuSEy5sbd7WXbtW3EH6A6MpwEh42c7qD+MqQo9QMJ6fWLAeIJynx0g6OAw==}
engines: {node: '>=4'}
peerDependencies:
'@typescript-eslint/parser': '*'
doctrine: 2.1.0
eslint: 8.38.0
eslint-import-resolver-node: 0.3.7
- eslint-module-utils: 2.7.4(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0)
+ eslint-module-utils: 2.8.0(@typescript-eslint/parser@5.58.0)(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.38.0)
has: 1.0.3
is-core-module: 2.12.0
is-glob: 4.0.3
minimatch: 3.1.2
object.values: 1.1.6
- resolve: 1.22.2
+ resolve: 1.22.3
semver: 6.3.0
tsconfig-paths: 3.14.2
transitivePeerDependencies:
ignore: 5.2.4
is-core-module: 2.12.0
minimatch: 3.1.2
- resolve: 1.22.2
+ resolve: 1.22.3
semver: 7.4.0
dev: true
dependencies:
chalk: 5.2.0
cli-truncate: 3.1.0
- commander: 10.0.0
+ commander: 10.0.1
debug: 4.3.4(supports-color@8.1.1)
execa: 7.1.1
lilconfig: 2.1.0
resolution: {integrity: sha512-/5CMN3T0R4XTj4DcGaexo+roZSdSFW/0AOOTROrjxzCG1wrWXEsGbRKevjlIL+ZDE4sZlJr5ED4YW0yqmkK+eA==}
dependencies:
hosted-git-info: 2.8.9
- resolve: 1.22.2
+ resolve: 1.22.3
semver: 5.7.1
validate-npm-package-license: 3.0.4
dev: true
resolution: {integrity: sha512-HFM8rkZ+i3zrV+4LQjwQ0W+ez98pApMGM3HUrN04j3CqzPOzl9nmP15Y8YXNm8QHGv/eacOVEjqhmWpkRV0NAw==}
engines: {node: '>= 0.10'}
dependencies:
- resolve: 1.22.2
+ resolve: 1.22.3
dev: true
/redent@3.0.0:
path-parse: 1.0.7
dev: true
- /resolve@1.22.2:
- resolution: {integrity: sha512-Sb+mjNHOULsBv818T40qSPeRiuWLyaGMa5ewydRLFimneixmVy2zdivRl+AF6jaYPC8ERxGDmFSiqui6SfPd+g==}
+ /resolve@1.22.3:
+ resolution: {integrity: sha512-P8ur/gp/AmbEzjr729bZnLjXK5Z+4P0zhIJgBgzqRih7hL7BOukHGtSTA3ACMY467GRFz3duQsi0bDZdR7DKdw==}
hasBin: true
dependencies:
is-core-module: 2.12.0
import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
import {
WorkerChoiceStrategies,
- type WorkerChoiceStrategy
+ type WorkerChoiceStrategy,
+ type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
import { CircularArray } from '../circular-array'
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- if ((opts.tasksQueueOptions?.concurrency as number) <= 0) {
- throw new Error(
- `Invalid worker tasks concurrency '${
- (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number
- }'`
- )
- }
- this.opts.tasksQueueOptions = {
- concurrency: opts.tasksQueueOptions?.concurrency ?? 1
- }
+ this.checkValidTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
+ this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
}
}
}
}
+ private checkValidTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions
+ ): void {
+ if ((tasksQueueOptions?.concurrency as number) <= 0) {
+ throw new Error(
+ `Invalid worker tasks concurrency '${
+ tasksQueueOptions.concurrency as number
+ }'`
+ )
+ }
+ }
+
/** @inheritDoc */
public abstract get type (): PoolType
})
}
this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
- workerChoiceStrategy
+ this.opts.workerChoiceStrategy
+ )
+ }
+
+ /** @inheritDoc */
+ public setWorkerChoiceStrategyOptions (
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ ): void {
+ this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.workerChoiceStrategyContext.setOptions(
+ this.opts.workerChoiceStrategyOptions
)
}
+ /** @inheritDoc */
+ public enableTasksQueue (enable: boolean, opts?: TasksQueueOptions): void {
+ if (this.opts.enableTasksQueue === true && !enable) {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.flushTasksQueue(workerNodeKey)
+ }
+ }
+ this.opts.enableTasksQueue = enable
+ this.setTasksQueueOptions(opts as TasksQueueOptions)
+ }
+
+ /** @inheritDoc */
+ public setTasksQueueOptions (opts: TasksQueueOptions): void {
+ if (this.opts.enableTasksQueue === true) {
+ this.checkValidTasksQueueOptions(opts)
+ this.opts.tasksQueueOptions = this.buildTasksQueueOptions(opts)
+ } else {
+ delete this.opts.tasksQueueOptions
+ }
+ }
+
+ private buildTasksQueueOptions (
+ tasksQueueOptions: TasksQueueOptions
+ ): TasksQueueOptions {
+ return {
+ concurrency: tasksQueueOptions?.concurrency ?? 1
+ }
+ }
+
/**
* Whether the pool is full or not.
*
/**
* Pool worker tasks queue.
*
- * @experimental
* @defaultValue false
*/
enableTasksQueue?: boolean
/**
* Pool worker tasks queue options.
- *
- * @experimental
*/
tasksQueueOptions?: TasksQueueOptions
}
* @param workerChoiceStrategy - The worker choice strategy.
*/
setWorkerChoiceStrategy: (workerChoiceStrategy: WorkerChoiceStrategy) => void
+ /**
+ * Sets the worker choice strategy options in this pool.
+ *
+ * @param workerChoiceStrategyOptions - The worker choice strategy options.
+ */
+ setWorkerChoiceStrategyOptions: (
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ ) => void
+ /**
+ * Enables/disables the worker tasks queue in this pool.
+ *
+ * @param enable - Whether to enable or disable the worker tasks queue.
+ * @param tasksQueueOptions - The worker tasks queue options.
+ */
+ enableTasksQueue: (enable: boolean, opts?: TasksQueueOptions) => void
+ /**
+ * Sets the worker tasks queue options in this pool.
+ *
+ * @param tasksQueueOptions - The worker tasks queue options.
+ */
+ setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
}
*/
public constructor (
protected readonly pool: IPool<Worker, Data, Response>,
- protected readonly opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
this.isDynamicPool = this.pool.type === PoolType.DYNAMIC
this.choose.bind(this)
this.requiredStatistics.avgRunTime = false
this.requiredStatistics.medRunTime = opts.medRunTime as boolean
}
+ if (this.requiredStatistics.medRunTime && opts.medRunTime === false) {
+ this.requiredStatistics.avgRunTime = true
+ this.requiredStatistics.medRunTime = opts.medRunTime as boolean
+ }
}
/** @inheritDoc */
/** @inheritDoc */
public abstract remove (workerNodeKey: number): boolean
+
+ /** @inheritDoc */
+ public setOptions (opts: WorkerChoiceStrategyOptions): void {
+ this.checkOptions(opts)
+ this.opts = opts
+ }
}
opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
super(pool, opts)
- this.checkOptions(opts)
+ this.checkOptions(this.opts)
}
/** @inheritDoc */
opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
super(pool, opts)
- this.checkOptions(opts)
+ this.checkOptions(this.opts)
}
/** @inheritDoc */
opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
super(pool, opts)
- this.checkOptions(opts)
+ this.checkOptions(this.opts)
}
/** @inheritDoc */
opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
super(pool, opts)
- this.checkOptions(opts)
+ this.checkOptions(this.opts)
}
/** @inheritDoc */
* @param workerNodeKey - The worker node key.
*/
remove: (workerNodeKey: number) => boolean
+ /**
+ * Sets the worker choice strategy options.
+ *
+ * @param opts - The worker choice strategy options.
+ */
+ setOptions: (opts: WorkerChoiceStrategyOptions) => void
}
opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
super(pool, opts)
- this.checkOptions(opts)
+ this.checkOptions(this.opts)
this.defaultWorkerWeight = this.computeWorkerWeight()
this.initWorkersTaskRunTime()
}
) as IWorkerChoiceStrategy
).remove(workerNodeKey)
}
+
+ /**
+ * Sets the worker choice strategies in the context options.
+ *
+ * @param opts - The worker choice strategy options.
+ */
+ public setOptions (opts: WorkerChoiceStrategyOptions): void {
+ this.workerChoiceStrategies.forEach(workerChoiceStrategy => {
+ workerChoiceStrategy.setOptions(opts)
+ })
+ }
}
await pool.destroy()
})
- it('Verify that pool options are valid', async () => {
+ it('Verify that pool options are validated', async () => {
expect(
() =>
new FixedThreadPool(
).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
})
+ it('Verify that worker choice strategy options can be set', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+ )
+ expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ medRunTime: false
+ })
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ .workerChoiceStrategies) {
+ expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
+ ).toBe(true)
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+ ).toBe(false)
+ pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
+ expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ medRunTime: true
+ })
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ .workerChoiceStrategies) {
+ expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
+ ).toBe(false)
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+ ).toBe(true)
+ pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
+ expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ medRunTime: false
+ })
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ .workerChoiceStrategies) {
+ expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
+ ).toBe(true)
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+ ).toBe(false)
+ await pool.destroy()
+ })
+
+ it('Verify that tasks queue can be enabled/disabled', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ expect(pool.opts.enableTasksQueue).toBe(false)
+ expect(pool.opts.tasksQueueOptions).toBeUndefined()
+ pool.enableTasksQueue(true)
+ expect(pool.opts.enableTasksQueue).toBe(true)
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+ pool.enableTasksQueue(true, { concurrency: 2 })
+ expect(pool.opts.enableTasksQueue).toBe(true)
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+ pool.enableTasksQueue(false)
+ expect(pool.opts.enableTasksQueue).toBe(false)
+ expect(pool.opts.tasksQueueOptions).toBeUndefined()
+ await pool.destroy()
+ })
+
+ it('Verify that tasks queue options can be set', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ { enableTasksQueue: true }
+ )
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+ pool.setTasksQueueOptions({ concurrency: 2 })
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+ expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
+ "Invalid worker tasks concurrency '0'"
+ )
+ await pool.destroy()
+ })
+
it('Simulate worker not found at getWorkerTasksUsage()', async () => {
const pool = new StubPoolWithRemoveAllWorker(
numberOfWorkers,
)
// TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)
)
// TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)
)
// TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)
)
// TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)
)
// TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)
)
// TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)
)
// TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)
)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
const promises = []
- for (let i = 0; i < max * 2; i++) {
+ const maxMultiplier = 2
+ for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
}
await Promise.all(promises)