"check-coverage": true,
"lines": 90,
"statements": 90,
- "functions": 92,
- "branches": 92
+ "functions": 90,
+ "branches": 90
}
## [Unreleased]
+### Fixed
+
+- Properly handle dynamic pool with zero minimum size.
+
## [3.1.13] - 2023-12-30
### Changed
private setTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
- )
+ this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
}
}
private unsetTaskStealing (): void {
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
for (const [workerNodeKey] of this.workerNodes.entries()) {
this.workerNodes[workerNodeKey].off(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
})
}
})
- const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
checkActive: true
})
})
}
}
- workerInfo.dynamic = true
+ const workerNode = this.workerNodes[workerNodeKey]
+ workerNode.info.dynamic = true
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
) {
- workerInfo.ready = true
+ workerNode.info.ready = true
}
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
if (this.opts.enableTasksQueue === true) {
if (this.opts.tasksQueueOptions?.taskStealing === true) {
this.workerNodes[workerNodeKey].on(
- 'idleWorkerNode',
- this.handleIdleWorkerNodeEvent
+ 'idle',
+ this.handleWorkerNodeIdleEvent
)
}
if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
this.workerNodes[workerNodeKey].on(
'backPressure',
- this.handleBackPressureEvent
+ this.handleWorkerNodeBackPressureEvent
)
}
}
}
}
- private readonly handleIdleWorkerNodeEvent = (
+ private readonly handleWorkerNodeIdleEvent = (
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
}
sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
.then(() => {
- this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
+ this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
return undefined
})
.catch(EMPTY_FUNCTION)
}
}
- private readonly handleBackPressureEvent = (
+ private readonly handleWorkerNodeBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
if (
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
- if (ready === false) {
+ if (ready == null || !ready) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
throw new Error(`Worker ${workerId!} failed to initialize`)
}
- const workerInfo = this.getWorkerInfo(
- this.getWorkerNodeKeyByWorkerId(workerId)
- )
- workerInfo.ready = ready as boolean
- workerInfo.taskFunctionNames = taskFunctionNames
+ const workerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ workerNode.info.ready = ready
+ workerNode.info.taskFunctionNames = taskFunctionNames
if (!this.readyEventEmitted && this.ready) {
this.readyEventEmitted = true
this.emitter?.emit(PoolEvents.ready, this.info)
this.tasksQueueSize(workerNodeKey) === 0 &&
workerNodeTasksUsage.sequentiallyStolen === 0
) {
- workerNode.emit('idleWorkerNode', {
+ workerNode.emit('idle', {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
workerId: workerId!,
workerNodeKey
/** @inheritDoc */
protected shallCreateDynamicWorker (): boolean {
- return !this.full && this.internalBusy()
+ return (
+ (!this.full && this.internalBusy()) ||
+ (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
+ )
}
/** @inheritDoc */
}
private fairShareNextWorkerNodeKey (): number | undefined {
+ if (this.pool.workerNodes.length === 0) {
+ return undefined
+ }
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
}
private leastBusyNextWorkerNodeKey (): number | undefined {
+ if (this.pool.workerNodes.length === 0) {
+ return undefined
+ }
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
}
private leastEluNextWorkerNodeKey (): number | undefined {
+ if (this.pool.workerNodes.length === 0) {
+ return undefined
+ }
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
}
private leastUsedNextWorkerNodeKey (): number | undefined {
+ if (this.pool.workerNodes.length === 0) {
+ return undefined
+ }
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
* @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
*/
public execute (): number {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const workerChoiceStrategy = this.workerChoiceStrategies.get(
- this.workerChoiceStrategy
- )!
- if (!workerChoiceStrategy.hasPoolWorkerNodesReady()) {
- return this.execute()
- }
- return this.executeStrategy(workerChoiceStrategy)
+ return this.executeStrategy(
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.workerChoiceStrategies.get(this.workerChoiceStrategy)!
+ )
}
/**
*
* @param workerChoiceStrategy - The worker choice strategy.
* @returns The key of the worker node.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
*/
private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
let workerNodeKey: number | undefined
/** @inheritDoc */
protected shallCreateDynamicWorker (): boolean {
- return !this.full && this.internalBusy()
+ return (
+ (!this.full && this.internalBusy()) ||
+ (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
+ )
}
/** @inheritDoc */
'./tests/worker-files/thread/testWorker.mjs'
)
expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
+ expect(pool.emitter.eventNames()).toStrictEqual([])
expect(pool.opts).toStrictEqual({
startWorkers: true,
enableEvents: true,
// We need to clean up the resources after our test
await pool.destroy()
})
+
+ it.skip('Verify that a pool with zero worker works', async () => {
+ const pool = new DynamicClusterPool(
+ 0,
+ max,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool.starting).toBe(false)
+ expect(pool.workerNodes.length).toBe(pool.info.minSize)
+ const maxMultiplier = 10000
+ const promises = new Set()
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(pool.workerNodes.length).toBe(max)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
})
await pool.destroy()
})
- it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
- const pool = new DynamicThreadPool(
- min,
- max,
- './tests/worker-files/thread/testWorker.mjs'
- )
- expect(pool.starting).toBe(false)
- expect(pool.workerNodes.length).toBe(min)
- const maxMultiplier = 10000
- const promises = new Set()
- for (let i = 0; i < max * maxMultiplier; i++) {
- promises.add(pool.execute())
- }
- await Promise.all(promises)
- expect(pool.workerNodes.length).toBe(max)
- // We need to clean up the resources after our test
- await pool.destroy()
- })
-
it('Verify ROUND_ROBIN strategy default policy', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
let pool = new FixedThreadPool(
workerChoiceStrategyStub
)
const chosenWorkerKey = workerChoiceStrategyContext.execute()
- expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategyContext.workerChoiceStrategy
- ).hasPoolWorkerNodesReady.callCount
- ).toBe(6)
expect(
workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategyContext.workerChoiceStrategy
expect(chosenWorkerKey).toBe(1)
})
- it('Verify that execute() throws error if worker choice strategy recursion reach the maximum depth', () => {
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
- fixedPool
- )
- const workerChoiceStrategyStub = createStubInstance(
- RoundRobinWorkerChoiceStrategy,
- {
- hasPoolWorkerNodesReady: stub().returns(false),
- choose: stub().returns(0)
- }
- )
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- WorkerChoiceStrategies.ROUND_ROBIN
- )
- workerChoiceStrategyContext.workerChoiceStrategies.set(
- workerChoiceStrategyContext.workerChoiceStrategy,
- workerChoiceStrategyStub
- )
- expect(() => workerChoiceStrategyContext.execute()).toThrow(
- new RangeError('Maximum call stack size exceeded')
- )
- })
-
it('Verify that execute() return the worker node key chosen by the strategy with dynamic pool', () => {
const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
dynamicPool
// We need to clean up the resources after our test
await pool.destroy()
})
+
+ it('Verify that a pool with zero worker works', async () => {
+ const pool = new DynamicThreadPool(
+ 0,
+ max,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool.starting).toBe(false)
+ expect(pool.workerNodes.length).toBe(pool.info.minSize)
+ const maxMultiplier = 10000
+ const promises = new Set()
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(pool.workerNodes.length).toBe(max)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
})