X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=f26efdd846b5e86ae026306105f34ca030f9d4ca;hb=146eaa19f0f0ad8b0aecfef0ad3d8552dd064f33;hp=cc4b0f387993f1a414a8b380dfc69ea54700b670;hpb=32b141fddfba99a275b6e18b5abd97c7a66513be;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index cc4b0f38..f26efdd8 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -124,6 +124,22 @@ describe('Abstract pool test suite', () => { ) }) + it('Verify that pool arguments number and pool type are checked', () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + undefined, + numberOfWorkers * 2 + ) + ).toThrow( + new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + ) + }) + it('Verify that dynamic pool sizing is checked', () => { expect( () => @@ -210,28 +226,32 @@ describe('Abstract pool test suite', () => { enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, - workerChoiceStrategyOptions: { - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - } + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: + pool.info.maxSize + + Object.keys(pool.workerChoiceStrategyContext.opts.weights).length, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { - expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(workerChoiceStrategy.opts).toStrictEqual( + expect.objectContaining({ + retries: + pool.info.maxSize + + Object.keys(workerChoiceStrategy.opts.weights).length, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + ) } await pool.destroy() const testHandler = () => console.info('test handler executed') @@ -265,14 +285,11 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 1000 + tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, weights: { 0: 300, 1: 200 } }, onlineHandler: testHandler, @@ -281,7 +298,9 @@ describe('Abstract pool test suite', () => { exitHandler: testHandler }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: + pool.info.maxSize + + Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -290,7 +309,9 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: + pool.info.maxSize + + Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -311,38 +332,6 @@ describe('Abstract pool test suite', () => { } ) ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: 'invalidChoiceRetries' - } - } - ) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: -1 - } - } - ) - ).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect( () => new FixedThreadPool( @@ -478,26 +467,31 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: + pool.info.maxSize + + Object.keys(pool.workerChoiceStrategyContext.opts.weights).length, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { - expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(workerChoiceStrategy.opts).toStrictEqual( + expect.objectContaining({ + retries: + pool.info.maxSize + + Object.keys(workerChoiceStrategy.opts.weights).length, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + ) } expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -523,25 +517,33 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: true }, - waitTime: { median: false }, elu: { median: true } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: + pool.info.maxSize + + Object.keys(pool.workerChoiceStrategyContext.opts.weights).length, runTime: { median: true }, waitTime: { median: false }, - elu: { median: true } + elu: { median: true }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { - expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: true } - }) + expect(workerChoiceStrategy.opts).toStrictEqual( + expect.objectContaining({ + retries: + pool.info.maxSize + + Object.keys(workerChoiceStrategy.opts.weights).length, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: true } + }) + ) } expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -567,25 +569,33 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: false }, - waitTime: { median: false }, elu: { median: false } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: + pool.info.maxSize + + Object.keys(pool.workerChoiceStrategyContext.opts.weights).length, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { - expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(workerChoiceStrategy.opts).toStrictEqual( + expect.objectContaining({ + retries: + pool.info.maxSize + + Object.keys(workerChoiceStrategy.opts.weights).length, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + ) } expect( pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() @@ -613,20 +623,6 @@ describe('Abstract pool test suite', () => { 'Invalid worker choice strategy options: must be a plain object' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ - retries: 'invalidChoiceRetries' - }) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -656,7 +652,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 1000 + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) @@ -665,7 +661,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 1000 + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) @@ -684,7 +680,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 1000 + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -696,14 +692,14 @@ describe('Abstract pool test suite', () => { size: 2, taskStealing: false, tasksStealingOnBackPressure: false, - tasksFinishedTimeout: 2000 + tasksFinishedTimeout: 3000 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 2, taskStealing: false, tasksStealingOnBackPressure: false, - tasksFinishedTimeout: 2000 + tasksFinishedTimeout: 3000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -720,7 +716,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 1000 + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -1278,6 +1274,63 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that destroy() waits for queued tasks to finish', async () => { + const tasksFinishedTimeout = 2500 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier) + expect(elapsedTime).toBeGreaterThanOrEqual(2000) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100) + }) + + it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { + const tasksFinishedTimeout = 1000 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(0) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600) + }) + it('Verify that pool asynchronous resource track tasks execution', async () => { let taskAsyncId let initCalls = 0 @@ -1613,6 +1666,11 @@ describe('Abstract pool test suite', () => { await expect( pool.sendKillMessageToWorker(workerNodeKey) ).resolves.toBeUndefined() + await expect( + pool.sendKillMessageToWorker(numberOfWorkers) + ).rejects.toStrictEqual( + new Error(`Invalid worker node key '${numberOfWorkers}'`) + ) await pool.destroy() })