)
expect(pool.opts.enableTasksQueue).toBe(false)
expect(pool.opts.tasksQueueOptions).toBeUndefined()
- for (const workerNode of pool.workerNodes) {
- expect(workerNode.onEmptyQueue).toBeUndefined()
- expect(workerNode.onBackPressure).toBeUndefined()
- }
pool.enableTasksQueue(true)
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
taskStealing: true,
tasksStealingOnBackPressure: true
})
- for (const workerNode of pool.workerNodes) {
- expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
- expect(workerNode.onBackPressure).toBeInstanceOf(Function)
- }
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
taskStealing: true,
tasksStealingOnBackPressure: true
})
- for (const workerNode of pool.workerNodes) {
- expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
- expect(workerNode.onBackPressure).toBeInstanceOf(Function)
- }
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
expect(pool.opts.tasksQueueOptions).toBeUndefined()
- for (const workerNode of pool.workerNodes) {
- expect(workerNode.onEmptyQueue).toBeUndefined()
- expect(workerNode.onBackPressure).toBeUndefined()
- }
await pool.destroy()
})
expect(workerNode.tasksQueueBackPressureSize).toBe(
pool.opts.tasksQueueOptions.size
)
- expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
- expect(workerNode.onBackPressure).toBeInstanceOf(Function)
}
pool.setTasksQueueOptions({
concurrency: 2,
expect(workerNode.tasksQueueBackPressureSize).toBe(
pool.opts.tasksQueueOptions.size
)
- expect(workerNode.onEmptyQueue).toBeUndefined()
- expect(workerNode.onBackPressure).toBeUndefined()
}
pool.setTasksQueueOptions({
concurrency: 1,
expect(workerNode.tasksQueueBackPressureSize).toBe(
pool.opts.tasksQueueOptions.size
)
- expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
- expect(workerNode.onBackPressure).toBeInstanceOf(Function)
}
expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
new TypeError('Invalid tasks queue options: must be a plain object')
'jsonIntegerSerialization',
'factorial'
])
+ await dynamicThreadPool.destroy()
})
it('Verify that multiple task functions worker is working', async () => {