* The pool readiness boolean status.
*/
private get ready (): boolean {
+ if (this.empty) {
+ return false
+ }
return (
this.workerNodes.reduce(
(accumulator, workerNode) =>
)
}
+ /**
+ * The pool emptiness boolean status.
+ */
+ protected get empty (): boolean {
+ if (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0) {
+ return true
+ }
+ return false
+ }
+
/**
* The approximate pool utilization.
*
}
}
+ private checkAndEmitReadyEvent (): void {
+ if (!this.readyEventEmitted && this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
+ this.readyEventEmitted = true
+ }
+ }
+
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionNames } = message
if (ready == null || !ready) {
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
workerNode.info.taskFunctionNames = taskFunctionNames
- if (!this.readyEventEmitted && this.ready) {
- this.emitter?.emit(PoolEvents.ready, this.info)
- this.readyEventEmitted = true
- }
+ this.checkAndEmitReadyEvent()
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
return workerNodeKey
}
+ private checkAndEmitEmptyEvent (): void {
+ if (this.empty) {
+ this.emitter?.emit(PoolEvents.empty, this.info)
+ this.readyEventEmitted = false
+ }
+ }
+
/**
* Removes the worker node from the pool worker nodes.
*
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext?.remove(workerNodeKey)
}
+ this.checkAndEmitEmptyEvent()
}
protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
/** @inheritDoc */
protected shallCreateDynamicWorker (): boolean {
- return (
- (!this.full && this.internalBusy()) ||
- (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
- )
+ return (!this.full && this.internalBusy()) || this.empty
}
/** @inheritDoc */
ready: 'ready',
busy: 'busy',
full: 'full',
+ empty: 'empty',
destroy: 'destroy',
error: 'error',
taskError: 'taskError',
*
* Events that can currently be listened to:
*
- * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready.
+ * - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready.
* - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
* - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
+ * - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
* - `'destroy'`: Emitted when the pool is destroyed.
* - `'error'`: Emitted when an uncaught error occurs.
* - `'taskError'`: Emitted when an error occurs while executing a task.
}
/**
- * Check the next worker node readiness.
+ * Check the next worker node key.
*/
- protected checkNextWorkerNodeReadiness (): void {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) {
+ protected checkNextWorkerNodeKey (): void {
+ if (
+ this.nextWorkerNodeKey != null &&
+ (this.nextWorkerNodeKey < 0 ||
+ !this.isWorkerNodeReady(this.nextWorkerNodeKey))
+ ) {
delete this.nextWorkerNodeKey
}
}
* @param workerNodeKey - The worker node key.
*/
protected setPreviousWorkerNodeKey (workerNodeKey: number | undefined): void {
- this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey
+ this.previousWorkerNodeKey =
+ workerNodeKey != null && workerNodeKey >= 0
+ ? workerNodeKey
+ : this.previousWorkerNodeKey
}
}
}
private interleavedWeightedRoundRobinNextWorkerNodeId (): void {
- if (
+ if (this.pool.workerNodes.length === 0) {
+ this.workerNodeId = 0
+ } else if (
this.roundId === this.roundWeights.length - 1 &&
this.workerNodeId === this.pool.workerNodes.length - 1
) {
const chosenWorkerNodeKey = this.nextWorkerNodeKey
this.setPreviousWorkerNodeKey(chosenWorkerNodeKey)
this.roundRobinNextWorkerNodeKey()
- this.checkNextWorkerNodeReadiness()
+ this.checkNextWorkerNodeKey()
return chosenWorkerNodeKey
}
public choose (): number | undefined {
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
this.weightedRoundRobinNextWorkerNodeKey()
- this.checkNextWorkerNodeReadiness()
+ this.checkNextWorkerNodeKey()
return this.nextWorkerNodeKey
}
let retriesCount = 0
do {
workerNodeKey = workerChoiceStrategy.choose()
+ if (workerNodeKey != null && workerNodeKey < 0) {
+ workerNodeKey = undefined
+ }
if (workerNodeKey == null && chooseCount > 0) {
retriesCount++
}
/** @inheritDoc */
protected shallCreateDynamicWorker (): boolean {
- return (
- (!this.full && this.internalBusy()) ||
- (this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0)
- )
+ return (!this.full && this.internalBusy()) || this.empty
}
/** @inheritDoc */
)
expect(pool.info.started).toBe(false)
expect(pool.info.ready).toBe(false)
- expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
+ expect(pool.readyEventEmitted).toBe(false)
await expect(pool.execute()).rejects.toThrow(
new Error('Cannot execute a task on not started pool')
)
import { expect } from 'expect'
-import { DynamicClusterPool, PoolEvents } from '../../../lib/index.cjs'
+import {
+ DynamicClusterPool,
+ PoolEvents,
+ WorkerChoiceStrategies
+} from '../../../lib/index.cjs'
import { TaskFunctions } from '../../test-types.cjs'
-import { sleep, waitWorkerEvents } from '../../test-utils.cjs'
+import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
describe('Dynamic cluster pool test suite', () => {
const min = 1
'./tests/worker-files/thread/testWorker.mjs'
)
expect(pool.starting).toBe(false)
- expect(pool.workerNodes.length).toBe(pool.info.minSize)
- for (let run = 0; run < 4; run++) {
- // pool.enableTasksQueue(true, { concurrency: 2 })
- const maxMultiplier = 10000
- const promises = new Set()
- for (let i = 0; i < max * maxMultiplier; i++) {
- promises.add(pool.execute())
+ for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ expect(pool.readyEventEmitted).toBe(false)
+ for (let run = 0; run < 2; run++) {
+ run % 2 !== 0 && pool.enableTasksQueue(true)
+ const maxMultiplier = 4
+ const promises = new Set()
+ expect(pool.workerNodes.length).toBe(pool.info.minSize)
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(pool.readyEventEmitted).toBe(true)
+ expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
+ expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
+ await waitPoolEvents(pool, PoolEvents.empty, 1)
+ expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.workerNodes.length).toBe(pool.info.minSize)
}
- await Promise.all(promises)
- expect(pool.workerNodes.length).toBe(max)
- await waitWorkerEvents(pool, 'exit', max)
- expect(pool.workerNodes.length).toBe(pool.info.minSize)
}
// We need to clean up the resources after our test
await pool.destroy()
import { expect } from 'expect'
-import { DynamicThreadPool, PoolEvents } from '../../../lib/index.cjs'
+import {
+ DynamicThreadPool,
+ PoolEvents,
+ WorkerChoiceStrategies
+} from '../../../lib/index.cjs'
import { TaskFunctions } from '../../test-types.cjs'
-import { sleep, waitWorkerEvents } from '../../test-utils.cjs'
+import { sleep, waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
describe('Dynamic thread pool test suite', () => {
const min = 1
'./tests/worker-files/thread/testWorker.mjs'
)
expect(pool.starting).toBe(false)
- expect(pool.workerNodes.length).toBe(pool.info.minSize)
- for (let run = 0; run < 4; run++) {
- // pool.enableTasksQueue(true, { concurrency: 2 })
- const maxMultiplier = 10000
- const promises = new Set()
- for (let i = 0; i < max * maxMultiplier; i++) {
- promises.add(pool.execute())
+ for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ expect(pool.readyEventEmitted).toBe(false)
+ for (let run = 0; run < 2; run++) {
+ run % 2 !== 0 && pool.enableTasksQueue(true)
+ const maxMultiplier = 4
+ const promises = new Set()
+ expect(pool.workerNodes.length).toBe(pool.info.minSize)
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(pool.readyEventEmitted).toBe(true)
+ expect(pool.workerNodes.length).toBeGreaterThan(pool.info.minSize)
+ expect(pool.workerNodes.length).toBeLessThanOrEqual(pool.info.maxSize)
+ await waitPoolEvents(pool, PoolEvents.empty, 1)
+ expect(pool.readyEventEmitted).toBe(false)
+ expect(pool.workerNodes.length).toBe(pool.info.minSize)
}
- await Promise.all(promises)
- expect(pool.workerNodes.length).toBe(max)
- await waitWorkerEvents(pool, 'exit', max)
- expect(pool.workerNodes.length).toBe(pool.info.minSize)
}
// We need to clean up the resources after our test
await pool.destroy()
await pool.destroy()
const numberOfExitEvents = await exitPromise
expect(pool.started).toBe(false)
- expect(pool.readyEventEmitted).toBe(false)
expect(pool.emitter.eventNames()).toStrictEqual([])
+ expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(0)
expect(numberOfExitEvents).toBe(numberOfThreads)
expect(poolDestroy).toBe(1)