FixedThreadPool,
PoolEvents,
WorkerChoiceStrategies
-} = require('../../../lib/index')
+} = require('../../../lib')
const { CircularArray } = require('../../../lib/circular-array')
+const { Queue } = require('../../../lib/queue')
describe('Abstract pool test suite', () => {
const numberOfWorkers = 1
await pool.destroy()
})
- it('Verify that pool options are valid', async () => {
+ it('Verify that pool options are validated', async () => {
expect(
() =>
new FixedThreadPool(
).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
})
+ it('Verify that worker choice strategy options can be set', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+ )
+ expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ medRunTime: false
+ })
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ .workerChoiceStrategies) {
+ expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
+ ).toBe(true)
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+ ).toBe(false)
+ pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
+ expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ medRunTime: true
+ })
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ .workerChoiceStrategies) {
+ expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
+ ).toBe(false)
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+ ).toBe(true)
+ pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
+ expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ medRunTime: false
+ })
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ .workerChoiceStrategies) {
+ expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
+ }
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
+ ).toBe(true)
+ expect(
+ pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
+ ).toBe(false)
+ await pool.destroy()
+ })
+
+ it('Verify that tasks queue can be enabled/disabled', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ expect(pool.opts.enableTasksQueue).toBe(false)
+ expect(pool.opts.tasksQueueOptions).toBeUndefined()
+ pool.enableTasksQueue(true)
+ expect(pool.opts.enableTasksQueue).toBe(true)
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+ pool.enableTasksQueue(true, { concurrency: 2 })
+ expect(pool.opts.enableTasksQueue).toBe(true)
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+ pool.enableTasksQueue(false)
+ expect(pool.opts.enableTasksQueue).toBe(false)
+ expect(pool.opts.tasksQueueOptions).toBeUndefined()
+ await pool.destroy()
+ })
+
+ it('Verify that tasks queue options can be set', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ { enableTasksQueue: true }
+ )
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
+ pool.setTasksQueueOptions({ concurrency: 2 })
+ expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
+ expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
+ "Invalid worker tasks concurrency '0'"
+ )
+ await pool.destroy()
+ })
+
it('Simulate worker not found at getWorkerTasksUsage()', async () => {
const pool = new StubPoolWithRemoveAllWorker(
numberOfWorkers,
)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueue).toBeDefined()
- expect(workerNode.tasksQueue).toBeInstanceOf(Array)
- expect(workerNode.tasksQueue.length).toBe(0)
+ expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
+ expect(workerNode.tasksQueue.size()).toBe(0)
}
await pool.destroy()
})