## [Unreleased]
+### Changed
+
+- Optimize worker choice strategy for dynamic pool.
+
### Fixed
- Ensure dynamic pool does not alter worker choice strategy expected behavior.
* A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
*
* This cluster pool creates new workers when the others are busy, up to the maximum number of workers.
- * When the maximum number of workers is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`.
+ * When the maximum number of workers is reached and workers are busy, an event is emitted. If you want to listen to this event, use the pool's `emitter`.
*
* @typeParam Data - Type of data sent to the worker. This can only be serializable data.
* @typeParam Response - Type of response of execution. This can only be serializable data.
* Internal contract definition for a poolifier pool.
*
* @typeParam Worker - Type of worker which manages this pool.
- * @typeParam Data - Type of data sent to the worker.
- * @typeParam Response - Type of response of execution.
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export interface IPoolInternal<
Worker extends IPoolWorker,
} from './selection-strategies-types'
/**
- * Abstract worker choice strategy class.
+ * Worker choice strategy abstract base class.
*
* @typeParam Worker - Type of worker which manages the strategy.
* @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+++ /dev/null
-import type { IPoolInternal } from '../pool-internal'
-import type { IPoolWorker } from '../pool-worker'
-import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
-import type {
- IWorkerChoiceStrategy,
- WorkerChoiceStrategy
-} from './selection-strategies-types'
-import { WorkerChoiceStrategies } from './selection-strategies-types'
-import { getWorkerChoiceStrategy } from './selection-strategies-utils'
-
-/**
- * Selects the next worker for dynamic pool.
- *
- * @typeParam Worker - Type of worker which manages the strategy.
- * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
- * @typeParam Response - Type of response of execution. This can only be serializable data.
- */
-export class DynamicPoolWorkerChoiceStrategy<
- Worker extends IPoolWorker,
- Data,
- Response
- >
- extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
- implements IWorkerChoiceStrategy {
- private readonly workerChoiceStrategy: IWorkerChoiceStrategy
-
- /**
- * Constructs a worker choice strategy for dynamic pool.
- *
- * @param pool - The pool instance.
- * @param createWorkerCallback - The worker creation callback for dynamic pool.
- * @param workerChoiceStrategy - The worker choice strategy when the pool is busy.
- */
- public constructor (
- pool: IPoolInternal<Worker, Data, Response>,
- private readonly createWorkerCallback: () => number,
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- ) {
- super(pool)
- this.workerChoiceStrategy = getWorkerChoiceStrategy(
- this.pool,
- workerChoiceStrategy
- )
- this.requiredStatistics = this.workerChoiceStrategy.requiredStatistics
- }
-
- /** {@inheritDoc} */
- public reset (): boolean {
- return this.workerChoiceStrategy.reset()
- }
-
- /** {@inheritDoc} */
- public choose (): number {
- if (this.pool.busy) {
- return this.workerChoiceStrategy.choose()
- }
- if (!this.pool.full && this.pool.findFreeWorkerKey() === -1) {
- return this.createWorkerCallback()
- }
- return this.workerChoiceStrategy.choose()
- }
-
- /** {@inheritDoc} */
- public remove (workerKey: number): boolean {
- return this.workerChoiceStrategy.remove(workerKey)
- }
-}
export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
/**
- * Pool tasks usage statistics requirements.
+ * Pool worker tasks usage statistics requirements.
*/
export interface RequiredStatistics {
runTime: boolean
import type { IPoolInternal } from '../pool-internal'
import { PoolType } from '../pool-internal'
import type { IPoolWorker } from '../pool-worker'
-import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
RequiredStatistics,
this.setWorkerChoiceStrategy(workerChoiceStrategy)
}
- /**
- * Gets the worker choice strategy instance specific to the pool type.
- *
- * @param workerChoiceStrategy - The worker choice strategy.
- * @returns The worker choice strategy instance for the pool type.
- */
- private getPoolWorkerChoiceStrategy (
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- ): IWorkerChoiceStrategy {
- if (this.pool.type === PoolType.DYNAMIC) {
- return new DynamicPoolWorkerChoiceStrategy(
- this.pool,
- this.createWorkerCallback,
- workerChoiceStrategy
- )
- }
- return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy)
- }
-
/**
* Gets the worker choice strategy required statistics.
*
workerChoiceStrategy: WorkerChoiceStrategy
): void {
this.workerChoiceStrategy?.reset()
- this.workerChoiceStrategy =
- this.getPoolWorkerChoiceStrategy(workerChoiceStrategy)
+ this.workerChoiceStrategy = getWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
}
/**
- * Chooses a worker with the underlying selection strategy.
+ * Chooses a worker with the worker choice strategy.
*
* @returns The key of the chosen one.
*/
public execute (): number {
+ if (
+ this.pool.type === PoolType.DYNAMIC &&
+ !this.pool.full &&
+ this.pool.findFreeWorkerKey() === -1
+ ) {
+ return this.createWorkerCallback()
+ }
return this.workerChoiceStrategy.choose()
}
/**
- * Removes a worker in the underlying selection strategy internals.
+ * Removes a worker in the worker choice strategy internals.
*
* @param workerKey - The key of the worker to remove.
* @returns `true` if the removal is successful, `false` otherwise.
* A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
*
* This thread pool creates new threads when the others are busy, up to the maximum number of threads.
- * When the maximum number of threads is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`.
+ * When the maximum number of threads is reached and workers are busy, an event is emitted. If you want to listen to this event, use the pool's `emitter`.
*
* @typeParam Data - Type of data sent to the worker. This can only be serializable data.
* @typeParam Response - Type of response of execution. This can only be serializable data.
{ workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- .nextWorkerId
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.nextWorkerId
).toBeUndefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- .nextWorkerId
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.nextWorkerId
).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
// if (process.platform !== 'win32') {
// expect(
// pool.workerChoiceStrategyContext.workerChoiceStrategy
- // .workerChoiceStrategy.workerLastVirtualTaskTimestamp.size
+ // .workerLastVirtualTaskTimestamp.size
// ).toBe(pool.workers.length)
// }
// We need to clean up the resources after our test
'./tests/worker-files/thread/testWorker.js'
)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
.workerLastVirtualTaskTimestamp
).toBeUndefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy.workerLastVirtualTaskTimestamp.keys()) {
+ for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategy.workerLastVirtualTaskTimestamp.keys()) {
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(
workerKey
).start
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.workerLastVirtualTaskTimestamp.get(
workerKey
).end
).toBe(0)
// TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
const promises = []
const maxMultiplier =
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
.defaultWorkerWeight * 2
for (let i = 0; i < max * maxMultiplier; i++) {
promises.push(pool.execute())
await Promise.all(promises)
if (process.platform !== 'win32') {
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- .workerChoiceStrategy.workersTaskRunTime.size
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.workersTaskRunTime
+ .size
).toBe(pool.workers.length)
}
// We need to clean up the resources after our test
'./tests/worker-files/thread/testWorker.js'
)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- .currentWorkerId
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.currentWorkerId
).toBeUndefined()
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- .defaultWorkerWeight
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.defaultWorkerWeight
).toBeUndefined()
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- .workersTaskRunTime
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.workersTaskRunTime
).toBeUndefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- .currentWorkerId
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.currentWorkerId
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- .defaultWorkerWeight
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.defaultWorkerWeight
).toBeGreaterThan(0)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy.workersTaskRunTime.keys()) {
+ for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategy.workersTaskRunTime.keys()) {
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy.workersTaskRunTime.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy.workersTaskRunTime.get(
workerKey
).runTime
).toBe(0)
const {
WeightedRoundRobinWorkerChoiceStrategy
} = require('../../../lib/pools/selection-strategies/weighted-round-robin-worker-choice-strategy')
-const {
- DynamicPoolWorkerChoiceStrategy
-} = require('../../../lib/pools/selection-strategies/dynamic-pool-worker-choice-strategy')
describe('Worker choice strategy context test suite', () => {
const min = 1
WorkerChoiceStrategies.ROUND_ROBIN
)
expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
- DynamicPoolWorkerChoiceStrategy
+ RoundRobinWorkerChoiceStrategy
)
- expect(
- workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
})
it('Verify that setWorkerChoiceStrategy() works with LESS_USED and fixed pool', () => {
WorkerChoiceStrategies.LESS_USED
)
expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
- DynamicPoolWorkerChoiceStrategy
+ LessUsedWorkerChoiceStrategy
)
- expect(
- workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- ).toBeInstanceOf(LessUsedWorkerChoiceStrategy)
})
it('Verify that setWorkerChoiceStrategy() works with LESS_BUSY and fixed pool', () => {
WorkerChoiceStrategies.LESS_BUSY
)
expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
- DynamicPoolWorkerChoiceStrategy
+ LessBusyWorkerChoiceStrategy
)
- expect(
- workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- ).toBeInstanceOf(LessBusyWorkerChoiceStrategy)
})
it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
WorkerChoiceStrategies.FAIR_SHARE
)
expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
- DynamicPoolWorkerChoiceStrategy
+ FairShareWorkerChoiceStrategy
)
- expect(
- workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- ).toBeInstanceOf(FairShareWorkerChoiceStrategy)
})
it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => {
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
)
expect(workerChoiceStrategyContext.workerChoiceStrategy).toBeInstanceOf(
- DynamicPoolWorkerChoiceStrategy
+ WeightedRoundRobinWorkerChoiceStrategy
)
- expect(
- workerChoiceStrategyContext.workerChoiceStrategy.workerChoiceStrategy
- ).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy)
})
})