X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=44aca3993c7d94d9f52185e553333db2316a8a71;hb=af7f2788b6fcc39343d544551cf17c8f0dc5b757;hp=5dc289359c704c51fb3feef8885eb75ff449c298;hpb=c3719753af0a9be03abf722a7543495359e817b5;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 5dc28935..44aca399 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -124,6 +124,22 @@ describe('Abstract pool test suite', () => { ) }) + it('Verify that pool arguments number and pool type are checked', () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + undefined, + numberOfWorkers * 2 + ) + ).toThrow( + new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + ) + }) + it('Verify that dynamic pool sizing is checked', () => { expect( () => @@ -210,16 +226,10 @@ describe('Abstract pool test suite', () => { enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, - workerChoiceStrategyOptions: { - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - } + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -227,7 +237,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -264,14 +274,12 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, weights: { 0: 300, 1: 200 } }, onlineHandler: testHandler, @@ -280,7 +288,7 @@ describe('Abstract pool test suite', () => { exitHandler: testHandler }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -289,7 +297,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -310,38 +318,6 @@ describe('Abstract pool test suite', () => { } ) ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: 'invalidChoiceRetries' - } - } - ) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: -1 - } - } - ) - ).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect( () => new FixedThreadPool( @@ -477,14 +453,9 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -492,7 +463,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -522,13 +493,11 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: true }, - waitTime: { median: false }, elu: { median: true } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -536,7 +505,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -566,13 +535,11 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: false }, - waitTime: { median: false }, elu: { median: false } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -580,7 +547,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -612,20 +579,6 @@ describe('Abstract pool test suite', () => { 'Invalid worker choice strategy options: must be a plain object' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ - retries: 'invalidChoiceRetries' - }) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -654,7 +607,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) @@ -662,7 +616,8 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) @@ -680,7 +635,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -691,13 +647,15 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -713,7 +671,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -1267,8 +1226,65 @@ describe('Abstract pool test suite', () => { stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) - expect(pool.hasBackPressure.called).toBe(true) + expect(pool.hasBackPressure.callCount).toBe(5) + await pool.destroy() + }) + + it('Verify that destroy() waits for queued tasks to finish', async () => { + const tasksFinishedTimeout = 2500 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier) + expect(elapsedTime).toBeGreaterThanOrEqual(2000) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100) + }) + + it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { + const tasksFinishedTimeout = 1000 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(0) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600) }) it('Verify that pool asynchronous resource track tasks execution', async () => { @@ -1606,6 +1622,11 @@ describe('Abstract pool test suite', () => { await expect( pool.sendKillMessageToWorker(workerNodeKey) ).resolves.toBeUndefined() + await expect( + pool.sendKillMessageToWorker(numberOfWorkers) + ).rejects.toStrictEqual( + new Error(`Invalid worker node key '${numberOfWorkers}'`) + ) await pool.destroy() })