X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=44aca3993c7d94d9f52185e553333db2316a8a71;hb=af7f2788b6fcc39343d544551cf17c8f0dc5b757;hp=1246a7d05bcc3cb06b81443fb50b9929216c9a0d;hpb=55082af96253bead6fb8d4d648c454ba71a38fb6;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 1246a7d0..44aca399 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -2,6 +2,7 @@ import { EventEmitterAsyncResource } from 'node:events' import { dirname, join } from 'node:path' import { readFileSync } from 'node:fs' import { fileURLToPath } from 'node:url' +import { createHook, executionAsyncId } from 'node:async_hooks' import { expect } from 'expect' import { restore, stub } from 'sinon' import { @@ -77,7 +78,10 @@ describe('Abstract pool test suite', () => { it('Verify that filePath is checked', () => { expect(() => new FixedThreadPool(numberOfWorkers)).toThrow( - new Error("Cannot find the worker file 'undefined'") + new TypeError('The worker file path must be specified') + ) + expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow( + new TypeError('The worker file path must be a string') ) expect( () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts') @@ -120,6 +124,22 @@ describe('Abstract pool test suite', () => { ) }) + it('Verify that pool arguments number and pool type are checked', () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + undefined, + numberOfWorkers * 2 + ) + ).toThrow( + new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + ) + }) + it('Verify that dynamic pool sizing is checked', () => { expect( () => @@ -206,16 +226,10 @@ describe('Abstract pool test suite', () => { enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, - workerChoiceStrategyOptions: { - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - } + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -223,7 +237,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -260,14 +274,12 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, weights: { 0: 300, 1: 200 } }, onlineHandler: testHandler, @@ -276,7 +288,7 @@ describe('Abstract pool test suite', () => { exitHandler: testHandler }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -285,7 +297,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -306,38 +318,6 @@ describe('Abstract pool test suite', () => { } ) ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: 'invalidChoiceRetries' - } - } - ) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: -1 - } - } - ) - ).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect( () => new FixedThreadPool( @@ -473,14 +453,9 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -488,7 +463,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -518,13 +493,11 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: true }, - waitTime: { median: false }, elu: { median: true } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -532,7 +505,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: true }, waitTime: { median: false }, elu: { median: true } @@ -562,13 +535,11 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: false }, - waitTime: { median: false }, elu: { median: false } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -576,7 +547,7 @@ describe('Abstract pool test suite', () => { for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, + retries: pool.info.maxSize, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } @@ -608,20 +579,6 @@ describe('Abstract pool test suite', () => { 'Invalid worker choice strategy options: must be a plain object' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ - retries: 'invalidChoiceRetries' - }) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -650,7 +607,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) @@ -658,7 +616,8 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) @@ -676,7 +635,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -687,13 +647,15 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -709,7 +671,8 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -807,6 +770,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -972,6 +936,7 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -999,6 +964,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1040,6 +1006,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1075,6 +1042,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1258,7 +1226,101 @@ describe('Abstract pool test suite', () => { stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) - expect(pool.hasBackPressure.called).toBe(true) + expect(pool.hasBackPressure.callCount).toBe(5) + await pool.destroy() + }) + + it('Verify that destroy() waits for queued tasks to finish', async () => { + const tasksFinishedTimeout = 2500 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier) + expect(elapsedTime).toBeGreaterThanOrEqual(2000) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100) + }) + + it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { + const tasksFinishedTimeout = 1000 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(0) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600) + }) + + it('Verify that pool asynchronous resource track tasks execution', async () => { + let taskAsyncId + let initCalls = 0 + let beforeCalls = 0 + let afterCalls = 0 + let resolveCalls = 0 + const hook = createHook({ + init (asyncId, type) { + if (type === 'poolifier:task') { + initCalls++ + taskAsyncId = asyncId + } + }, + before (asyncId) { + if (asyncId === taskAsyncId) beforeCalls++ + }, + after (asyncId) { + if (asyncId === taskAsyncId) afterCalls++ + }, + promiseResolve () { + if (executionAsyncId() === taskAsyncId) resolveCalls++ + } + }) + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + hook.enable() + await pool.execute() + hook.disable() + expect(initCalls).toBe(1) + expect(beforeCalls).toBe(1) + expect(afterCalls).toBe(1) + expect(resolveCalls).toBe(1) await pool.destroy() }) @@ -1341,6 +1403,7 @@ describe('Abstract pool test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1516,6 +1579,7 @@ describe('Abstract pool test suite', () => { executing: 0, failed: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0 }, runTime: { @@ -1558,6 +1622,11 @@ describe('Abstract pool test suite', () => { await expect( pool.sendKillMessageToWorker(workerNodeKey) ).resolves.toBeUndefined() + await expect( + pool.sendKillMessageToWorker(numberOfWorkers) + ).rejects.toStrictEqual( + new Error(`Invalid worker node key '${numberOfWorkers}'`) + ) await pool.destroy() })