X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract%2Fabstract-pool.test.js;h=03bcce050f35a2f8907f0974ddf8c1c8c828937d;hb=b1aae69557f4f5c524f665e92882b76f23a19866;hp=5af1591207feca5269881d640db718b351aa66f3;hpb=90d7d101196cf9702ccc1d220dd33cca67a427b0;p=poolifier.git diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 5af15912..03bcce05 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,5 +1,5 @@ -const { MessageChannel } = require('worker_threads') const { expect } = require('expect') +const sinon = require('sinon') const { DynamicClusterPool, DynamicThreadPool, @@ -168,6 +168,13 @@ describe('Abstract pool test suite', () => { WorkerChoiceStrategies.ROUND_ROBIN ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -206,7 +213,17 @@ describe('Abstract pool test suite', () => { WorkerChoiceStrategies.LEAST_USED ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, + weights: { 0: 300, 1: 200 } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, weights: { 0: 300, 1: 200 } }) expect(pool.opts.messageHandler).toStrictEqual(testHandler) @@ -226,18 +243,8 @@ describe('Abstract pool test suite', () => { workerChoiceStrategy: 'invalidStrategy' } ) - ).toThrowError("Invalid worker choice strategy 'invalidStrategy'") - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.js', - { - workerChoiceStrategyOptions: 'invalidOptions' - } - ) ).toThrowError( - 'Invalid worker choice strategy options: must be a plain object' + new Error("Invalid worker choice strategy 'invalidStrategy'") ) expect( () => @@ -249,7 +256,9 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - 'Invalid worker choice strategy options: must have a weight for each worker node' + new Error( + 'Invalid worker choice strategy options: must have a weight for each worker node' + ) ) expect( () => @@ -261,7 +270,9 @@ describe('Abstract pool test suite', () => { } ) ).toThrowError( - "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + new Error( + "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + ) ) expect( () => @@ -273,7 +284,11 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0 } } ) - ).toThrowError("Invalid worker tasks concurrency '0'") + ).toThrowError( + new TypeError( + 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + ) + ) expect( () => new FixedThreadPool( @@ -284,7 +299,9 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: 'invalidTasksQueueOptions' } ) - ).toThrowError('Invalid tasks queue options: must be a plain object') + ).toThrowError( + new TypeError('Invalid tasks queue options: must be a plain object') + ) expect( () => new FixedThreadPool( @@ -295,7 +312,9 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0.2 } } ) - ).toThrowError('Invalid worker tasks concurrency: must be an integer') + ).toThrowError( + new TypeError('Invalid worker tasks concurrency: must be an integer') + ) }) it('Verify that pool worker choice strategy options can be set', async () => { @@ -305,6 +324,13 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -312,6 +338,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -341,13 +368,23 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, runTime: { median: true }, + waitTime: { median: false }, + elu: { median: true } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, + runTime: { median: true }, + waitTime: { median: false }, elu: { median: true } }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: true }, + waitTime: { median: false }, elu: { median: true } }) } @@ -375,13 +412,23 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ + choiceRetries: 6, + runTime: { median: false }, + waitTime: { median: false }, elu: { median: false } }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ + choiceRetries: 6, runTime: { median: false }, + waitTime: { median: false }, elu: { median: false } }) } @@ -407,17 +454,23 @@ describe('Abstract pool test suite', () => { expect(() => pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions') ).toThrowError( - 'Invalid worker choice strategy options: must be a plain object' + new TypeError( + 'Invalid worker choice strategy options: must be a plain object' + ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} }) ).toThrowError( - 'Invalid worker choice strategy options: must have a weight for each worker node' + new Error( + 'Invalid worker choice strategy options: must have a weight for each worker node' + ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' }) ).toThrowError( - "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + new Error( + "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" + ) ) await pool.destroy() }) @@ -452,12 +505,21 @@ describe('Abstract pool test suite', () => { expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 }) expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') - ).toThrowError('Invalid tasks queue options: must be a plain object') + ).toThrowError( + new TypeError('Invalid tasks queue options: must be a plain object') + ) expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError( - "Invalid worker tasks concurrency '0'" + new Error( + 'Invalid worker tasks concurrency: 0 is a negative integer or zero' + ) + ) + expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError( + new Error( + 'Invalid worker tasks concurrency: -1 is a negative integer or zero' + ) ) expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( - 'Invalid worker tasks concurrency: must be an integer' + new TypeError('Invalid worker tasks concurrency: must be an integer') ) await pool.destroy() }) @@ -588,8 +650,7 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true, - messageChannel: expect.any(MessageChannel) + ready: true }) } }) @@ -734,29 +795,60 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'full' event can register a callback", async () => { - const pool = new DynamicThreadPool( + it("Verify that pool event emitter 'ready' event can register a callback", async () => { + const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + let poolInfo + let poolReady = 0 + pool.emitter.on(PoolEvents.ready, (info) => { + ++poolReady + poolInfo = info + }) + await waitPoolEvents(pool, PoolEvents.ready, 1) + expect(poolReady).toBe(1) + expect(poolInfo).toStrictEqual({ + version, + type: PoolTypes.dynamic, + worker: WorkerTypes.cluster, + ready: true, + strategy: WorkerChoiceStrategies.ROUND_ROBIN, + minSize: expect.any(Number), + maxSize: expect.any(Number), + workerNodes: expect.any(Number), + idleWorkerNodes: expect.any(Number), + busyWorkerNodes: expect.any(Number), + executedTasks: expect.any(Number), + executingTasks: expect.any(Number), + failedTasks: expect.any(Number) + }) + await pool.destroy() + }) + + it("Verify that pool event emitter 'busy' event can register a callback", async () => { + const pool = new FixedThreadPool( numberOfWorkers, './tests/worker-files/thread/testWorker.js' ) const promises = new Set() - let poolFull = 0 + let poolBusy = 0 let poolInfo - pool.emitter.on(PoolEvents.full, (info) => { - ++poolFull + pool.emitter.on(PoolEvents.busy, (info) => { + ++poolBusy poolInfo = info }) for (let i = 0; i < numberOfWorkers * 2; i++) { promises.add(pool.execute()) } await Promise.all(promises) - // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool. - // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2. - expect(poolFull).toBe(numberOfWorkers * 2 - 1) + // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. + // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. + expect(poolBusy).toBe(numberOfWorkers + 1) expect(poolInfo).toStrictEqual({ version, - type: PoolTypes.dynamic, + type: PoolTypes.fixed, worker: WorkerTypes.thread, ready: expect.any(Boolean), strategy: WorkerChoiceStrategies.ROUND_ROBIN, @@ -772,25 +864,29 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'ready' event can register a callback", async () => { - const pool = new DynamicClusterPool( + it("Verify that pool event emitter 'full' event can register a callback", async () => { + const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/thread/testWorker.js' ) + const promises = new Set() + let poolFull = 0 let poolInfo - let poolReady = 0 - pool.emitter.on(PoolEvents.ready, (info) => { - ++poolReady + pool.emitter.on(PoolEvents.full, (info) => { + ++poolFull poolInfo = info }) - await waitPoolEvents(pool, PoolEvents.ready, 1) - expect(poolReady).toBe(1) + for (let i = 0; i < numberOfWorkers * 2; i++) { + promises.add(pool.execute()) + } + await Promise.all(promises) + expect(poolFull).toBe(1) expect(poolInfo).toStrictEqual({ version, type: PoolTypes.dynamic, - worker: WorkerTypes.cluster, - ready: true, + worker: WorkerTypes.thread, + ready: expect.any(Boolean), strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -804,28 +900,38 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it("Verify that pool event emitter 'busy' event can register a callback", async () => { + it.skip("Verify that pool event emitter 'backPressure' event can register a callback", async () => { const pool = new FixedThreadPool( numberOfWorkers, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/thread/testWorker.js', + { + enableTasksQueue: true + } ) + for (const workerNode of pool.workerNodes) { + workerNode.hasBackPressure = sinon + .stub() + .onFirstCall() + .returns(true) + .returns(false) + } const promises = new Set() - let poolBusy = 0 + let poolBackPressure = 0 let poolInfo - pool.emitter.on(PoolEvents.busy, (info) => { - ++poolBusy + pool.emitter.on(PoolEvents.backPressure, (info) => { + ++poolBackPressure poolInfo = info }) for (let i = 0; i < numberOfWorkers * 2; i++) { promises.add(pool.execute()) } + // console.log(pool.info.backPressure) await Promise.all(promises) - // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. - // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. - expect(poolBusy).toBe(numberOfWorkers + 1) + // console.log(pool.info.backPressure) + expect(poolBackPressure).toBe(1) expect(poolInfo).toStrictEqual({ version, - type: PoolTypes.fixed, + type: PoolTypes.dynamic, worker: WorkerTypes.thread, ready: expect.any(Boolean), strategy: WorkerChoiceStrategies.ROUND_ROBIN, @@ -882,5 +988,43 @@ describe('Abstract pool test suite', () => { expect(result2).toBe(3628800) const result3 = await pool.execute(data, 'fibonacci') expect(result3).toBe(55) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(4) + for (const workerNode of pool.workerNodes) { + expect(workerNode.info.taskFunctions).toStrictEqual([ + 'default', + 'jsonIntegerSerialization', + 'factorial', + 'fibonacci' + ]) + expect(workerNode.taskFunctionsUsage.size).toBe(3) + for (const name of pool.listTaskFunctions()) { + expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: expect.any(Number), + failed: 0, + queued: 0 + }, + runTime: { + history: expect.any(CircularArray) + }, + waitTime: { + history: expect.any(CircularArray) + }, + elu: { + idle: { + history: expect.any(CircularArray) + }, + active: { + history: expect.any(CircularArray) + } + } + }) + expect( + workerNode.getTaskFunctionWorkerUsage(name).tasks.executing + ).toBeGreaterThanOrEqual(0) + } + } }) })