X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=84a0195e0cfd3b1aaef9b818476891b11a61cbc5;hb=e372f3935374fbbea3a5d3acc72813b36ced86ff;hp=1b80aa4cdeb92efd76ed0c5f4e56e7357342c8a0;hpb=568d0075825104b5a5ccc38dea2beeb55b55f3d0;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 1b80aa4c..84a0195e 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -14,12 +14,12 @@ import { PoolTypes, WorkerChoiceStrategies, WorkerTypes -} from '../../lib/index.js' -import { CircularArray } from '../../lib/circular-array.js' -import { Deque } from '../../lib/deque.js' -import { DEFAULT_TASK_NAME } from '../../lib/utils.js' -import { waitPoolEvents } from '../test-utils.js' -import { WorkerNode } from '../../lib/pools/worker-node.js' +} from '../../lib/index.cjs' +import { CircularArray } from '../../lib/circular-array.cjs' +import { Deque } from '../../lib/deque.cjs' +import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' +import { waitPoolEvents } from '../test-utils.cjs' +import { WorkerNode } from '../../lib/pools/worker-node.cjs' describe('Abstract pool test suite', () => { const version = JSON.parse( @@ -105,7 +105,7 @@ describe('Abstract pool test suite', () => { it('Verify that a negative number of workers is checked', () => { expect( () => - new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js') + new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs') ).toThrow( new RangeError( 'Cannot instantiate a pool with a negative number of workers' @@ -124,13 +124,29 @@ describe('Abstract pool test suite', () => { ) }) + it('Verify that pool arguments number and pool type are checked', () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + undefined, + numberOfWorkers * 2 + ) + ).toThrow( + new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + ) + }) + it('Verify that dynamic pool sizing is checked', () => { expect( () => new DynamicClusterPool( 1, undefined, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) ).toThrow( new TypeError( @@ -154,7 +170,7 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 0, 0.5, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) ).toThrow( new TypeError( @@ -190,7 +206,7 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 1, 1, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) ).toThrow( new RangeError( @@ -205,32 +221,24 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) + expect(pool.emitter.eventNames()).toStrictEqual([]) expect(pool.opts).toStrictEqual({ startWorkers: true, enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, - workerChoiceStrategyOptions: { - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } await pool.destroy() @@ -269,10 +277,7 @@ describe('Abstract pool test suite', () => { }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, weights: { 0: 300, 1: 200 } }, onlineHandler: testHandler, @@ -280,17 +285,9 @@ describe('Abstract pool test suite', () => { errorHandler: testHandler, exitHandler: testHandler }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, - weights: { 0: 300, 1: 200 } - }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -311,38 +308,6 @@ describe('Abstract pool test suite', () => { } ) ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: 'invalidChoiceRetries' - } - } - ) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: -1 - } - } - ) - ).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect( () => new FixedThreadPool( @@ -478,25 +443,17 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( @@ -523,24 +480,19 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: true } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, elu: { median: true } }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: true }, waitTime: { median: false }, - elu: { median: true } + elu: { median: true }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( @@ -567,24 +519,19 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, - waitTime: { median: false }, elu: { median: false } }) for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( @@ -613,20 +560,6 @@ describe('Abstract pool test suite', () => { 'Invalid worker choice strategy options: must be a plain object' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ - retries: 'invalidChoiceRetries' - }) - ).toThrow( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -784,7 +717,7 @@ describe('Abstract pool test suite', () => { pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) expect(pool.info).toStrictEqual({ version, @@ -808,7 +741,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks usage are initialized', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -844,7 +777,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks queue are initialized', async () => { let pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -870,7 +803,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker info are initialized', async () => { let pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -878,7 +811,8 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.cluster, dynamic: false, - ready: true + ready: true, + stealing: false }) } await pool.destroy() @@ -893,7 +827,8 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true + ready: true, + stealing: false }) } await pool.destroy() @@ -920,15 +855,15 @@ describe('Abstract pool test suite', () => { it('Verify that pool can be started after initialization', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js', + './tests/worker-files/cluster/testWorker.cjs', { startWorkers: false } ) expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) - expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes).toStrictEqual([]) + expect(pool.readyEventEmitted).toBe(false) await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) @@ -947,7 +882,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool execute() arguments are checked', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) await expect(pool.execute(undefined, 0)).rejects.toThrow( new TypeError('name argument must be a string') @@ -970,7 +905,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks usage are computed', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const promises = new Set() const maxMultiplier = 2 @@ -1121,7 +1056,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) expect(pool.emitter.eventNames()).toStrictEqual([]) let poolInfo @@ -1265,6 +1200,7 @@ describe('Abstract pool test suite', () => { maxSize: expect.any(Number), workerNodes: expect.any(Number), idleWorkerNodes: expect.any(Number), + stealingWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), @@ -1274,8 +1210,65 @@ describe('Abstract pool test suite', () => { stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) - expect(pool.hasBackPressure.callCount).toBe(5) + expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7) + await pool.destroy() + }) + + it('Verify that destroy() waits for queued tasks to finish', async () => { + const tasksFinishedTimeout = 2500 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier) + expect(elapsedTime).toBeGreaterThanOrEqual(2000) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800) + }) + + it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { + const tasksFinishedTimeout = 1000 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(0) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800) }) it('Verify that pool asynchronous resource track tasks execution', async () => { @@ -1332,7 +1325,7 @@ describe('Abstract pool test suite', () => { await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) @@ -1472,7 +1465,7 @@ describe('Abstract pool test suite', () => { await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([ @@ -1542,7 +1535,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) const data = { n: 10 } const result0 = await pool.execute(data) @@ -1607,7 +1600,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const workerNodeKey = 0 await expect( @@ -1620,7 +1613,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const workerNodeKey = 0 await expect( @@ -1640,7 +1633,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) await expect( pool.sendTaskFunctionOperationToWorkers({