size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 1000
+ tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 1000
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 1000
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 1000
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
- tasksFinishedTimeout: 2000
+ tasksFinishedTimeout: 3000
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
- tasksFinishedTimeout: 2000
+ tasksFinishedTimeout: 3000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 1000
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
await pool.destroy()
})
+ it('Verify that destroy() waits for queued tasks to finish', async () => {
+ const tasksFinishedTimeout = 2500
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
+ expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+ })
+
+ it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+ const tasksFinishedTimeout = 1000
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(0)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 300)
+ })
+
it('Verify that pool asynchronous resource track tasks execution', async () => {
let taskAsyncId
let initCalls = 0
await expect(
pool.sendKillMessageToWorker(workerNodeKey)
).resolves.toBeUndefined()
+ await expect(
+ pool.sendKillMessageToWorker(numberOfWorkers)
+ ).rejects.toStrictEqual(
+ new Error(`Invalid worker node key '${numberOfWorkers}'`)
+ )
await pool.destroy()
})