Response
>
+ /**
+ * Dynamic pool maximum size property placeholder.
+ */
+ protected readonly max?: number
+
/**
* Whether the pool is starting or not.
*/
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.executeTask = this.executeTask.bind(this)
this.enqueueTask = this.enqueueTask.bind(this)
- this.dequeueTask = this.dequeueTask.bind(this)
- this.checkAndEmitTaskExecutionEvents =
- this.checkAndEmitTaskExecutionEvents.bind(this)
- this.checkAndEmitTaskQueuingEvents =
- this.checkAndEmitTaskQueuingEvents.bind(this)
- this.checkAndEmitDynamicWorkerCreationEvents =
- this.checkAndEmitDynamicWorkerCreationEvents.bind(this)
if (this.opts.enableEvents === true) {
this.emitter = new PoolEmitter()
tasksQueueOptions.concurrency <= 0
) {
throw new Error(
- `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
+ `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
)
}
}
/**
* The pool minimum size.
*/
- protected abstract get minSize (): number
+ protected get minSize (): number {
+ return this.numberOfWorkers
+ }
/**
* The pool maximum size.
*/
- protected abstract get maxSize (): number
+ protected get maxSize (): number {
+ return this.max ?? this.numberOfWorkers
+ }
/**
* Checks if the worker id sent in the received message from a worker is valid.
* @param poolMaxSize - The pool maximum size.
*/
constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
+ if (worker == null) {
+ throw new Error('Cannot construct a worker node without a worker')
+ }
+ if (workerType == null) {
+ throw new Error('Cannot construct a worker node without a worker type')
+ }
+ if (poolMaxSize == null) {
+ throw new Error(
+ 'Cannot construct a worker node without a pool maximum size'
+ )
+ }
+ if (isNaN(poolMaxSize)) {
+ throw new Error(
+ 'Cannot construct a worker node with a NaN pool maximum size'
+ )
+ }
this.worker = worker
this.info = this.initWorkerInfo(worker, workerType)
if (workerType === WorkerTypes.thread) {
/** @inheritdoc */
public hasBackPressure (): boolean {
- return this.tasksQueueSize() >= this.tasksQueueBackPressureSize
+ return this.tasksQueue.size >= this.tasksQueueBackPressureSize
}
/** @inheritdoc */
workerChoiceStrategy: 'invalidStrategy'
}
)
- ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
+ ).toThrowError(
+ new Error("Invalid worker choice strategy 'invalidStrategy'")
+ )
expect(
() =>
new FixedThreadPool(
}
)
).toThrowError(
- 'Invalid worker choice strategy options: must have a weight for each worker node'
+ new Error(
+ 'Invalid worker choice strategy options: must have a weight for each worker node'
+ )
)
expect(
() =>
}
)
).toThrowError(
- "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
+ new Error(
+ "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
+ )
)
expect(
() =>
tasksQueueOptions: { concurrency: 0 }
}
)
- ).toThrowError("Invalid worker tasks concurrency '0'")
+ ).toThrowError(
+ new TypeError(
+ 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
+ )
+ )
expect(
() =>
new FixedThreadPool(
tasksQueueOptions: 'invalidTasksQueueOptions'
}
)
- ).toThrowError('Invalid tasks queue options: must be a plain object')
+ ).toThrowError(
+ new TypeError('Invalid tasks queue options: must be a plain object')
+ )
expect(
() =>
new FixedThreadPool(
tasksQueueOptions: { concurrency: 0.2 }
}
)
- ).toThrowError('Invalid worker tasks concurrency: must be an integer')
+ ).toThrowError(
+ new TypeError('Invalid worker tasks concurrency: must be an integer')
+ )
})
it('Verify that pool worker choice strategy options can be set', async () => {
expect(() =>
pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
).toThrowError(
- 'Invalid worker choice strategy options: must be a plain object'
+ new TypeError(
+ 'Invalid worker choice strategy options: must be a plain object'
+ )
)
expect(() =>
pool.setWorkerChoiceStrategyOptions({ weights: {} })
).toThrowError(
- 'Invalid worker choice strategy options: must have a weight for each worker node'
+ new Error(
+ 'Invalid worker choice strategy options: must have a weight for each worker node'
+ )
)
expect(() =>
pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
).toThrowError(
- "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
+ new Error(
+ "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
+ )
)
await pool.destroy()
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
expect(() =>
pool.setTasksQueueOptions('invalidTasksQueueOptions')
- ).toThrowError('Invalid tasks queue options: must be a plain object')
+ ).toThrowError(
+ new TypeError('Invalid tasks queue options: must be a plain object')
+ )
expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
- "Invalid worker tasks concurrency '0'"
+ new Error(
+ 'Invalid worker tasks concurrency: 0 is a negative integer or zero'
+ )
+ )
+ expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError(
+ new Error(
+ 'Invalid worker tasks concurrency: -1 is a negative integer or zero'
+ )
)
expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
- 'Invalid worker tasks concurrency: must be an integer'
+ new TypeError('Invalid worker tasks concurrency: must be an integer')
)
await pool.destroy()
})
await pool.destroy()
})
+ it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
+ const pool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ {
+ enableTasksQueue: true
+ }
+ )
+ const promises = new Set()
+ let poolBackPressure = 0
+ let poolInfo
+ pool.emitter.on(PoolEvents.backPressure, (info) => {
+ ++poolBackPressure
+ poolInfo = info
+ })
+ for (let i = 0; i < Math.pow(numberOfWorkers, 2); i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(poolBackPressure).toBe(1)
+ expect(poolInfo).toStrictEqual({
+ version,
+ type: PoolTypes.dynamic,
+ worker: WorkerTypes.thread,
+ ready: expect.any(Boolean),
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ minSize: expect.any(Number),
+ maxSize: expect.any(Number),
+ workerNodes: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ busyWorkerNodes: expect.any(Number),
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ failedTasks: expect.any(Number)
+ })
+ await pool.destroy()
+ })
+
it('Verify that listTaskFunctions() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),