X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract%2Fabstract-pool.test.js;h=933e1f944f863bdbcdb4110c1d75359e8f571b4c;hb=68cbdc846878bc058323b757a68b4c83eedc6388;hp=0ecd39440efb43718400eeed0e3ef2b92e357736;hpb=33e6bb4c6f1061542693dbd996ffc08c62e8beeb;p=poolifier.git diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 0ecd3944..933e1f94 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,4 +1,5 @@ const { expect } = require('expect') +const sinon = require('sinon') const { DynamicClusterPool, DynamicThreadPool, @@ -10,7 +11,7 @@ const { WorkerTypes } = require('../../../lib') const { CircularArray } = require('../../../lib/circular-array') -const { Queue } = require('../../../lib/queue') +const { Deque } = require('../../../lib/deque') const { version } = require('../../../package.json') const { waitPoolEvents } = require('../../test-utils') @@ -22,6 +23,10 @@ describe('Abstract pool test suite', () => { } } + afterEach(() => { + sinon.restore() + }) + it('Simulate pool creation from a non main thread/process', () => { expect( () => @@ -33,7 +38,9 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - 'Cannot start a pool from a worker with the same type as the pool' + new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) ) }) @@ -60,7 +67,9 @@ describe('Abstract pool test suite', () => { it('Verify that numberOfWorkers is checked', () => { expect(() => new FixedThreadPool()).toThrowError( - 'Cannot instantiate a pool without specifying the number of workers' + new Error( + 'Cannot instantiate a pool without specifying the number of workers' + ) ) }) @@ -207,7 +216,10 @@ describe('Abstract pool test suite', () => { expect(pool.opts.enableEvents).toBe(false) expect(pool.opts.restartWorkerOnError).toBe(false) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + size: 4 + }) expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.LEAST_USED ) @@ -242,7 +254,9 @@ describe('Abstract pool test suite', () => { workerChoiceStrategy: 'invalidStrategy' } ) - ).toThrowError("Invalid worker choice strategy 'invalidStrategy'") + ).toThrowError( + new Error("Invalid worker choice strategy 'invalidStrategy'") + ) expect( () => new FixedThreadPool( @@ -253,7 +267,9 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - 'Invalid worker choice strategy options: must have a weight for each worker node' + new Error( + 'Invalid worker choice strategy options: must have a weight for each worker node' + ) ) expect( () => @@ -265,7 +281,9 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + new Error( + "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + ) ) expect( () => @@ -277,7 +295,11 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0 } } ) - ).toThrowError("Invalid worker tasks concurrency '0'") + ).toThrowError( + new RangeError( + 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' + ) + ) expect( () => new FixedThreadPool( @@ -288,7 +310,9 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: 'invalidTasksQueueOptions' } ) - ).toThrowError('Invalid tasks queue options: must be a plain object') + ).toThrowError( + new TypeError('Invalid tasks queue options: must be a plain object') + ) expect( () => new FixedThreadPool( @@ -299,7 +323,9 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0.2 } } ) - ).toThrowError('Invalid worker tasks concurrency: must be an integer') + ).toThrowError( + new TypeError('Invalid worker node tasks concurrency: must be an integer') + ) }) it('Verify that pool worker choice strategy options can be set', async () => { @@ -439,17 +465,23 @@ describe('Abstract pool test suite', () => { expect(() => pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions') ).toThrowError( - 'Invalid worker choice strategy options: must be a plain object' + new TypeError( + 'Invalid worker choice strategy options: must be a plain object' + ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} }) ).toThrowError( - 'Invalid worker choice strategy options: must have a weight for each worker node' + new Error( + 'Invalid worker choice strategy options: must have a weight for each worker node' + ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' }) ).toThrowError( - "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + new Error( + "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + ) ) await pool.destroy() }) @@ -463,10 +495,16 @@ describe('Abstract pool test suite', () => { expect(pool.opts.tasksQueueOptions).toBeUndefined() pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + size: 4 + }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + size: 4 + }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() @@ -479,17 +517,47 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js', { enableTasksQueue: true } ) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + size: 4 + }) pool.setTasksQueueOptions({ concurrency: 2 }) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 2, + size: 4 + }) expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') - ).toThrowError('Invalid tasks queue options: must be a plain object') + ).toThrowError( + new TypeError('Invalid tasks queue options: must be a plain object') + ) expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError( - "Invalid worker tasks concurrency '0'" + new RangeError( + 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError( + new RangeError( + 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' + ) ) expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( - 'Invalid worker tasks concurrency: must be an integer' + new TypeError('Invalid worker node tasks concurrency: must be an integer') + ) + expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError( + new RangeError( + 'Invalid worker node tasks queue max size: 0 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError( + new RangeError( + 'Invalid worker node tasks queue max size: -1 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError( + new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) ) await pool.destroy() }) @@ -550,6 +618,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -578,7 +647,7 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueue).toBeDefined() - expect(workerNode.tasksQueue).toBeInstanceOf(Queue) + expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) } @@ -590,7 +659,7 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueue).toBeDefined() - expect(workerNode.tasksQueue).toBeInstanceOf(Queue) + expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) } @@ -642,6 +711,7 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -668,6 +738,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -708,6 +779,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -726,7 +798,9 @@ describe('Abstract pool test suite', () => { } }) expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) expect(workerNode.usage.runTime.history.length).toBe(0) expect(workerNode.usage.waitTime.history.length).toBe(0) expect(workerNode.usage.elu.idle.history.length).toBe(0) @@ -740,6 +814,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + stolen: 0, failed: 0 }, runTime: { @@ -870,6 +945,50 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it("Verify that pool event emitter 'backPressure' event can register a callback", async () => { + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true + } + ) + sinon.stub(pool, 'hasBackPressure').returns(true) + const promises = new Set() + let poolBackPressure = 0 + let poolInfo + pool.emitter.on(PoolEvents.backPressure, (info) => { + ++poolBackPressure + poolInfo = info + }) + for (let i = 0; i < numberOfWorkers + 1; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + expect(poolBackPressure).toBe(1) + expect(poolInfo).toStrictEqual({ + version, + type: PoolTypes.fixed, + worker: WorkerTypes.thread, + ready: expect.any(Boolean), + strategy: WorkerChoiceStrategies.ROUND_ROBIN, + minSize: expect.any(Number), + maxSize: expect.any(Number), + workerNodes: expect.any(Number), + idleWorkerNodes: expect.any(Number), + busyWorkerNodes: expect.any(Number), + executedTasks: expect.any(Number), + executingTasks: expect.any(Number), + maxQueuedTasks: expect.any(Number), + queuedTasks: expect.any(Number), + backPressure: true, + stolenTasks: expect.any(Number), + failedTasks: expect.any(Number) + }) + expect(pool.hasBackPressure.called).toBe(true) + await pool.destroy() + }) + it('Verify that listTaskFunctions() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), @@ -927,7 +1046,8 @@ describe('Abstract pool test suite', () => { executed: expect.any(Number), executing: expect.any(Number), failed: 0, - queued: 0 + queued: 0, + stolen: 0 }, runTime: { history: expect.any(CircularArray)