X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=8cbbc0a5dc59e522f3328271dbfd8cd812586388;hb=2eee72204bc851f616ded11cb5381f96c6dc5cbf;hp=ba328b6809dec8c0295c00e89331a0b0a1e22800;hpb=bcfb06ce041a682baf396a099c633a848d6a4045;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index ba328b68..8cbbc0a5 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -9,7 +9,6 @@ import { expect } from 'expect' import { restore, stub } from 'sinon' import { CircularArray } from '../../lib/circular-array.cjs' -import { Deque } from '../../lib/deque.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -21,6 +20,7 @@ import { WorkerTypes } from '../../lib/index.cjs' import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { PriorityQueue } from '../../lib/priority-queue.cjs' import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' import { waitPoolEvents } from '../test-utils.cjs' @@ -275,7 +275,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, @@ -468,8 +468,8 @@ describe('Abstract pool test suite', () => { median: false }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -507,8 +507,8 @@ describe('Abstract pool test suite', () => { median: true }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -546,8 +546,8 @@ describe('Abstract pool test suite', () => { median: false }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -591,7 +591,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) @@ -600,7 +600,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) @@ -619,7 +619,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { @@ -786,9 +786,10 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2) } await pool.destroy() pool = new DynamicThreadPool( @@ -798,9 +799,10 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2) } await pool.destroy() }) @@ -817,6 +819,7 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.cluster, dynamic: false, ready: true, + backPressure: false, stealing: false }) } @@ -833,6 +836,7 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.thread, dynamic: false, ready: true, + backPressure: false, stealing: false }) } @@ -1372,24 +1376,67 @@ describe('Abstract pool test suite', () => { await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow( new TypeError('taskFunction property must be a function') ) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: '' }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: -21 + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: 20 + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + strategy: 'invalidStrategy' + }) + ).rejects.toThrow( + new Error("Invalid worker choice strategy 'invalidStrategy'") + ) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' } ]) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) const echoTaskFunction = data => { return data } await expect( - dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) ).resolves.toBe(true) expect(dynamicThreadPool.taskFunctions.size).toBe(1) expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ - taskFunction: echoTaskFunction + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' }, - { name: 'echo' } + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) const taskFunctionData = { test: 'test' } const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') @@ -1410,15 +1457,55 @@ describe('Abstract pool test suite', () => { waitTime: { history: new CircularArray() }, - elu: { - idle: { - history: new CircularArray() - }, - active: { - history: new CircularArray() - } - } + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) }) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed + ).toBeGreaterThan(0) + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate == + null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeGreaterThan(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeGreaterThanOrEqual(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeGreaterThanOrEqual(0) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeLessThanOrEqual(1) + } } await dynamicThreadPool.destroy() }) @@ -1440,21 +1527,34 @@ describe('Abstract pool test suite', () => { const echoTaskFunction = data => { return data } - await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + await dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) expect(dynamicThreadPool.taskFunctions.size).toBe(1) expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ - taskFunction: echoTaskFunction + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' }, - { name: 'echo' } + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe( true ) expect(dynamicThreadPool.taskFunctions.size).toBe(0) expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined() + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' } @@ -1462,7 +1562,7 @@ describe('Abstract pool test suite', () => { await dynamicThreadPool.destroy() }) - it('Verify that listTaskFunctionNames() is working', async () => { + it('Verify that listTaskFunctionsProperties() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers,