X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract%2Fabstract-pool.test.js;h=e9c65dde91de251b6228c303861696a9f8629162;hb=2324f8c9233f6e036eee418327c46bb7e71356b9;hp=5435565a9419ede3b7f7222266c1dda03e416438;hpb=8c0b113f317f5e59160fb5174fd951330756221b;p=poolifier.git diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 5435565a..e9c65dde 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1,3 +1,4 @@ +const { EventEmitter } = require('node:events') const { expect } = require('expect') const sinon = require('sinon') const { @@ -15,6 +16,7 @@ const { Deque } = require('../../../lib/deque') const { DEFAULT_TASK_NAME } = require('../../../lib/utils') const { version } = require('../../../package.json') const { waitPoolEvents } = require('../../test-utils') +const { WorkerNode } = require('../../../lib/pools/worker-node') describe('Abstract pool test suite', () => { const numberOfWorkers = 2 @@ -35,7 +37,7 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.js', { - errorHandler: (e) => console.error(e) + errorHandler: e => console.error(e) } ) ).toThrowError( @@ -53,7 +55,6 @@ describe('Abstract pool test suite', () => { expect(pool.starting).toBe(false) expect(pool.started).toBe(true) await pool.destroy() - expect(pool.started).toBe(false) }) it('Verify that filePath is checked', () => { @@ -179,19 +180,19 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.js' ) - expect(pool.emitter).toBeDefined() - expect(pool.opts.enableEvents).toBe(true) - expect(pool.opts.restartWorkerOnError).toBe(true) - expect(pool.opts.enableTasksQueue).toBe(false) - expect(pool.opts.tasksQueueOptions).toBeUndefined() - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.ROUND_ROBIN - ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } + expect(pool.emitter).toBeInstanceOf(EventEmitter) + expect(pool.opts).toStrictEqual({ + startWorkers: true, + enableEvents: true, + restartWorkerOnError: true, + enableTasksQueue: false, + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + workerChoiceStrategyOptions: { + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + } }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ retries: 6, @@ -199,10 +200,15 @@ describe('Abstract pool test suite', () => { waitTime: { median: false }, elu: { median: false } }) - expect(pool.opts.messageHandler).toBeUndefined() - expect(pool.opts.errorHandler).toBeUndefined() - expect(pool.opts.onlineHandler).toBeUndefined() - expect(pool.opts.exitHandler).toBeUndefined() + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + .workerChoiceStrategies) { + expect(workerChoiceStrategy.opts).toStrictEqual({ + retries: 6, + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }) + } await pool.destroy() const testHandler = () => console.info('test handler executed') pool = new FixedThreadPool( @@ -225,22 +231,29 @@ describe('Abstract pool test suite', () => { } ) expect(pool.emitter).toBeUndefined() - expect(pool.opts.enableEvents).toBe(false) - expect(pool.opts.restartWorkerOnError).toBe(false) - expect(pool.opts.enableTasksQueue).toBe(true) - expect(pool.opts.tasksQueueOptions).toStrictEqual({ - concurrency: 2, - size: 4 - }) - expect(pool.opts.workerChoiceStrategy).toBe( - WorkerChoiceStrategies.LEAST_USED - ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, - weights: { 0: 300, 1: 200 } + expect(pool.opts).toStrictEqual({ + startWorkers: true, + enableEvents: false, + restartWorkerOnError: false, + enableTasksQueue: true, + tasksQueueOptions: { + concurrency: 2, + size: Math.pow(numberOfWorkers, 2), + taskStealing: true, + tasksStealingOnBackPressure: true + }, + workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, + workerChoiceStrategyOptions: { + retries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, + weights: { 0: 300, 1: 200 } + }, + onlineHandler: testHandler, + messageHandler: testHandler, + errorHandler: testHandler, + exitHandler: testHandler }) expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ retries: 6, @@ -249,10 +262,16 @@ describe('Abstract pool test suite', () => { elu: { median: false }, weights: { 0: 300, 1: 200 } }) - expect(pool.opts.messageHandler).toStrictEqual(testHandler) - expect(pool.opts.errorHandler).toStrictEqual(testHandler) - expect(pool.opts.onlineHandler).toStrictEqual(testHandler) - expect(pool.opts.exitHandler).toStrictEqual(testHandler) + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + .workerChoiceStrategies) { + expect(workerChoiceStrategy.opts).toStrictEqual({ + retries: 6, + runTime: { median: true }, + waitTime: { median: false }, + elu: { median: false }, + weights: { 0: 300, 1: 200 } + }) + } await pool.destroy() }) @@ -385,21 +404,6 @@ describe('Abstract pool test suite', () => { ).toThrowError( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.js', - { - enableTasksQueue: true, - tasksQueueOptions: { queueMaxSize: 2 } - } - ) - ).toThrowError( - new Error( - 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' - ) - ) expect( () => new FixedThreadPool( @@ -626,21 +630,41 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: 4 + size: Math.pow(numberOfWorkers, 2), + taskStealing: true, + tasksStealingOnBackPressure: true }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, - size: 4 + size: Math.pow(numberOfWorkers, 2), + taskStealing: true, + tasksStealingOnBackPressure: true }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } await pool.destroy() }) @@ -652,13 +676,54 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: 4 + size: Math.pow(numberOfWorkers, 2), + taskStealing: true, + tasksStealingOnBackPressure: true + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksQueueBackPressureSize).toBe( + pool.opts.tasksQueueOptions.size + ) + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } + pool.setTasksQueueOptions({ + concurrency: 2, + size: 2, + taskStealing: false, + tasksStealingOnBackPressure: false }) - pool.setTasksQueueOptions({ concurrency: 2 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, - size: 4 + size: 2, + taskStealing: false, + tasksStealingOnBackPressure: false }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksQueueBackPressureSize).toBe( + pool.opts.tasksQueueOptions.size + ) + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } + pool.setTasksQueueOptions({ + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true + }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + size: Math.pow(numberOfWorkers, 2), + taskStealing: true, + tasksStealingOnBackPressure: true + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksQueueBackPressureSize).toBe( + pool.opts.tasksQueueOptions.size + ) + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') ).toThrowError( @@ -677,11 +742,6 @@ describe('Abstract pool test suite', () => { expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) - expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError( - new Error( - 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead' - ) - ) expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError( new RangeError( 'Invalid worker node tasks queue size: 0 is a negative integer or zero' @@ -707,6 +767,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: numberOfWorkers, @@ -728,6 +789,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.cluster, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: Math.floor(numberOfWorkers / 2), @@ -748,6 +810,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.usage).toStrictEqual({ tasks: { executed: 0, @@ -758,17 +821,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, elu: { idle: { - history: expect.any(CircularArray) + history: new CircularArray() }, active: { - history: expect.any(CircularArray) + history: new CircularArray() } } }) @@ -782,7 +845,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksQueue).toBeDefined() + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) @@ -794,7 +857,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerNode of pool.workerNodes) { - expect(workerNode.tasksQueue).toBeDefined() + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.tasksQueue).toBeInstanceOf(Deque) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) @@ -808,6 +871,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.info).toStrictEqual({ id: expect.any(Number), type: WorkerTypes.cluster, @@ -822,6 +886,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.js' ) for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) expect(workerNode.info).toStrictEqual({ id: expect.any(Number), type: WorkerTypes.thread, @@ -832,6 +897,30 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) + it('Verify that pool can be started after initialization', async () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + startWorkers: false + } + ) + expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) + expect(pool.workerNodes).toStrictEqual([]) + await expect(pool.execute()).rejects.toThrowError( + new Error('Cannot execute a task on not started pool') + ) + pool.start() + expect(pool.info.started).toBe(true) + expect(pool.info.ready).toBe(true) + expect(pool.workerNodes.length).toBe(numberOfWorkers) + for (const workerNode of pool.workerNodes) { + expect(workerNode).toBeInstanceOf(WorkerNode) + } + await pool.destroy() + }) + it('Verify that pool execute() arguments are checked', async () => { const pool = new FixedClusterPool( numberOfWorkers, @@ -850,8 +939,8 @@ describe('Abstract pool test suite', () => { "Task function 'unknown' not found" ) await pool.destroy() - await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError( - new Error('Cannot execute a task on destroyed pool') + await expect(pool.execute()).rejects.toThrowError( + new Error('Cannot execute a task on not started pool') ) }) @@ -1009,7 +1098,7 @@ describe('Abstract pool test suite', () => { ) let poolInfo let poolReady = 0 - pool.emitter.on(PoolEvents.ready, (info) => { + pool.emitter.on(PoolEvents.ready, info => { ++poolReady poolInfo = info }) @@ -1019,6 +1108,7 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.cluster, + started: true, ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), @@ -1041,7 +1131,7 @@ describe('Abstract pool test suite', () => { const promises = new Set() let poolBusy = 0 let poolInfo - pool.emitter.on(PoolEvents.busy, (info) => { + pool.emitter.on(PoolEvents.busy, info => { ++poolBusy poolInfo = info }) @@ -1056,7 +1146,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1079,7 +1170,7 @@ describe('Abstract pool test suite', () => { const promises = new Set() let poolFull = 0 let poolInfo - pool.emitter.on(PoolEvents.full, (info) => { + pool.emitter.on(PoolEvents.full, info => { ++poolFull poolInfo = info }) @@ -1092,7 +1183,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.dynamic, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1118,7 +1210,7 @@ describe('Abstract pool test suite', () => { const promises = new Set() let poolBackPressure = 0 let poolInfo - pool.emitter.on(PoolEvents.backPressure, (info) => { + pool.emitter.on(PoolEvents.backPressure, info => { ++poolBackPressure poolInfo = info }) @@ -1131,7 +1223,8 @@ describe('Abstract pool test suite', () => { version, type: PoolTypes.fixed, worker: WorkerTypes.thread, - ready: expect.any(Boolean), + started: true, + ready: true, strategy: WorkerChoiceStrategies.ROUND_ROBIN, minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1207,7 +1300,7 @@ describe('Abstract pool test suite', () => { expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ tasks: { executed: expect.any(Number), - executing: expect.any(Number), + executing: 0, failed: 0, queued: 0, stolen: 0 @@ -1228,10 +1321,31 @@ describe('Abstract pool test suite', () => { } }) expect( - workerNode.getTaskFunctionWorkerUsage(name).tasks.executing - ).toBeGreaterThanOrEqual(0) + workerNode.getTaskFunctionWorkerUsage(name).tasks.executed + ).toBeGreaterThan(0) } + expect( + workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) + ).toStrictEqual( + workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1]) + ) } await pool.destroy() }) + + it('Verify sendKillMessageToWorker()', async () => { + const pool = new DynamicClusterPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + const workerNodeKey = 0 + await expect( + pool.sendKillMessageToWorker( + workerNodeKey, + pool.workerNodes[workerNodeKey].info.id + ) + ).resolves.toBeUndefined() + await pool.destroy() + }) })