// The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
expect(poolBusy).toBe(max + 1)
- const numberOfExitEvents = await TestUtils.waitExits(pool, max - min)
+ const numberOfExitEvents = await TestUtils.waitWorkerExits(pool, max - min)
expect(numberOfExitEvents).toBe(max - min)
})
pool.execute()
}
expect(pool.workerNodes.length).toBeGreaterThan(min)
- await TestUtils.waitExits(pool, max - min)
+ await TestUtils.waitWorkerExits(pool, max - min)
expect(pool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 2; i++) {
pool.execute()
}
expect(pool.workerNodes.length).toBeGreaterThan(min)
- await TestUtils.waitExits(pool, max - min)
+ await TestUtils.waitWorkerExits(pool, max - min)
expect(pool.workerNodes.length).toBe(min)
})
it('Shutdown test', async () => {
- const exitPromise = TestUtils.waitExits(pool, min)
+ const exitPromise = TestUtils.waitWorkerExits(pool, min)
await pool.destroy()
const numberOfExitEvents = await exitPromise
expect(numberOfExitEvents).toBe(min)
longRunningPool.execute()
}
expect(longRunningPool.workerNodes.length).toBe(max)
- await TestUtils.waitExits(longRunningPool, max - min)
+ await TestUtils.waitWorkerExits(longRunningPool, max - min)
expect(longRunningPool.workerNodes.length).toBe(min)
expect(
longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
it('Verify that error handling is working properly:async', async () => {
const data = { f: 10 }
- // let taskError
- // errorPool.emitter.on(PoolEvents.taskError, e => {
- // taskError = e
- // })
+ let taskError
+ asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
+ taskError = e
+ })
let inError
try {
await asyncErrorPool.execute(data)
expect(inError).toBeDefined()
expect(typeof inError === 'string').toBe(true)
expect(inError).toBe('Error Message from ClusterWorker:async')
- // expect(taskError).toStrictEqual({
- // error: 'Error Message from ClusterWorker:async',
- // errorData: data
- // })
+ expect(taskError).toStrictEqual({
+ error: 'Error Message from ClusterWorker:async',
+ errorData: data
+ })
expect(
asyncErrorPool.workerNodes.some(
workerNode => workerNode.tasksUsage.error === 1
})
it('Shutdown test', async () => {
- const exitPromise = TestUtils.waitExits(pool, numberOfWorkers)
+ const exitPromise = TestUtils.waitWorkerExits(pool, numberOfWorkers)
await pool.destroy()
const numberOfExitEvents = await exitPromise
expect(numberOfExitEvents).toBe(numberOfWorkers)
// The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
expect(poolBusy).toBe(max + 1)
- const numberOfExitEvents = await TestUtils.waitExits(pool, max - min)
+ const numberOfExitEvents = await TestUtils.waitWorkerExits(pool, max - min)
expect(numberOfExitEvents).toBe(max - min)
})
pool.execute()
}
expect(pool.workerNodes.length).toBe(max)
- await TestUtils.waitExits(pool, max - min)
+ await TestUtils.waitWorkerExits(pool, max - min)
expect(pool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 2; i++) {
pool.execute()
}
expect(pool.workerNodes.length).toBe(max)
- await TestUtils.waitExits(pool, max - min)
+ await TestUtils.waitWorkerExits(pool, max - min)
expect(pool.workerNodes.length).toBe(min)
})
it('Shutdown test', async () => {
- const exitPromise = TestUtils.waitExits(pool, min)
+ const exitPromise = TestUtils.waitWorkerExits(pool, min)
await pool.destroy()
const numberOfExitEvents = await exitPromise
expect(numberOfExitEvents).toBe(min)
longRunningPool.execute()
}
expect(longRunningPool.workerNodes.length).toBe(max)
- await TestUtils.waitExits(longRunningPool, max - min)
+ await TestUtils.waitWorkerExits(longRunningPool, max - min)
expect(longRunningPool.workerNodes.length).toBe(min)
expect(
longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
it('Verify that error handling is working properly:async', async () => {
const data = { f: 10 }
- // let taskError
- // errorPool.emitter.on(PoolEvents.taskError, e => {
- // taskError = e
- // })
+ let taskError
+ asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
+ taskError = e
+ })
let inError
try {
await asyncErrorPool.execute(data)
expect(inError.message).toBeDefined()
expect(typeof inError.message === 'string').toBe(true)
expect(inError.message).toBe('Error Message from ThreadWorker:async')
- // expect(taskError).toStrictEqual({
- // error: new Error('Error Message from ThreadWorker:async'),
- // errorData: data
- // })
+ expect(taskError).toStrictEqual({
+ error: new Error('Error Message from ThreadWorker:async'),
+ errorData: data
+ })
expect(
asyncErrorPool.workerNodes.some(
workerNode => workerNode.tasksUsage.error === 1
})
it('Shutdown test', async () => {
- const exitPromise = TestUtils.waitExits(pool, numberOfThreads)
+ const exitPromise = TestUtils.waitWorkerExits(pool, numberOfThreads)
await pool.destroy()
const numberOfExitEvents = await exitPromise
expect(numberOfExitEvents).toBe(numberOfThreads)
const { WorkerFunctions } = require('./test-types')
class TestUtils {
- static async waitExits (pool, numberOfExitEventsToWait) {
+ static async waitWorkerExits (pool, numberOfExitEventsToWait) {
return new Promise(resolve => {
let exitEvents = 0
if (numberOfExitEventsToWait === 0) {
})
}
+ static async waitPoolEvents (pool, poolEvent, numberOfEventsToWait) {
+ return new Promise(resolve => {
+ let events = 0
+ if (numberOfEventsToWait === 0) {
+ resolve(events)
+ }
+ pool.emitter.on(poolEvent, () => {
+ ++events
+ if (events === numberOfEventsToWait) {
+ resolve(events)
+ }
+ })
+ })
+ }
+
static async sleep (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}