X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=0c91349765f65c80884d4c50d8b45d93424e37fd;hb=f12182ad6dc553c7a5dfeee01bcde65c0177f671;hp=50656e85f37bfc4bfe60af9edb69881876729438;hpb=31847469b406e46688d8aafb880e250706dd8aee;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 50656e85..0c913497 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -8,8 +8,7 @@ import { fileURLToPath } from 'node:url' import { expect } from 'expect' import { restore, stub } from 'sinon' -import { CircularArray } from '../../lib/circular-array.cjs' -import { Deque } from '../../lib/deque.cjs' +import { CircularBuffer } from '../../lib/circular-buffer.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -21,6 +20,7 @@ import { WorkerTypes } from '../../lib/index.cjs' import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { PriorityQueue } from '../../lib/priority-queue.cjs' import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' import { waitPoolEvents } from '../test-utils.cjs' @@ -232,7 +232,7 @@ describe('Abstract pool test suite', () => { enableTasksQueue: false, workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -275,7 +275,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, @@ -288,7 +288,7 @@ describe('Abstract pool test suite', () => { errorHandler: testHandler, exitHandler: testHandler }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, @@ -447,7 +447,7 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -460,7 +460,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -468,8 +468,8 @@ describe('Abstract pool test suite', () => { median: false }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -486,7 +486,7 @@ describe('Abstract pool test suite', () => { runTime: { median: true }, elu: { median: true } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, @@ -499,7 +499,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -507,8 +507,8 @@ describe('Abstract pool test suite', () => { median: true }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -525,7 +525,7 @@ describe('Abstract pool test suite', () => { runTime: { median: false }, elu: { median: false } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -538,7 +538,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -546,8 +546,8 @@ describe('Abstract pool test suite', () => { median: false }, waitTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, elu: { @@ -591,7 +591,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) @@ -600,7 +600,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) @@ -619,7 +619,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { @@ -706,7 +706,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: 0, minSize: numberOfWorkers, maxSize: numberOfWorkers, @@ -729,7 +729,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: 0, minSize: Math.floor(numberOfWorkers / 2), maxSize: numberOfWorkers, @@ -761,17 +761,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) @@ -786,9 +786,10 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.bucketSize).toBe(numberOfWorkers * 2) } await pool.destroy() pool = new DynamicThreadPool( @@ -798,9 +799,10 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) + expect(workerNode.tasksQueue.bucketSize).toBe(numberOfWorkers * 2) } await pool.destroy() }) @@ -817,7 +819,8 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.cluster, dynamic: false, ready: true, - stealing: false + stealing: false, + backPressure: false }) } await pool.destroy() @@ -833,7 +836,8 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.thread, dynamic: false, ready: true, - stealing: false + stealing: false, + backPressure: false }) } await pool.destroy() @@ -929,17 +933,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -957,17 +961,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -975,7 +979,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => { + it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -999,17 +1003,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -1017,16 +1021,12 @@ describe('Abstract pool test suite', () => { expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( numberOfWorkers * maxMultiplier ) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: 0, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, @@ -1035,24 +1035,24 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) } await pool.destroy() }) @@ -1079,7 +1079,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1120,7 +1120,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1160,7 +1160,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1203,7 +1203,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1368,24 +1368,67 @@ describe('Abstract pool test suite', () => { await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow( new TypeError('taskFunction property must be a function') ) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { taskFunction: '' }) + ).rejects.toThrow(new TypeError('taskFunction property must be a function')) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: -21 + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + priority: 20 + }) + ).rejects.toThrow( + new RangeError("Property 'priority' must be between -20 and 19") + ) + await expect( + dynamicThreadPool.addTaskFunction('test', { + taskFunction: () => {}, + strategy: 'invalidStrategy' + }) + ).rejects.toThrow( + new Error("Invalid worker choice strategy 'invalidStrategy'") + ) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' } ]) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) const echoTaskFunction = data => { return data } await expect( - dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) ).resolves.toBe(true) expect(dynamicThreadPool.taskFunctions.size).toBe(1) expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ - taskFunction: echoTaskFunction + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' }, - { name: 'echo' } + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) const taskFunctionData = { test: 'test' } const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') @@ -1401,20 +1444,60 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, - elu: { - idle: { - history: new CircularArray() - }, - active: { - history: new CircularArray() - } - } + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularBuffer) + }), + active: expect.objectContaining({ + history: expect.any(CircularBuffer) + }) + }) }) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed + ).toBeGreaterThan(0) + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate == + null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeGreaterThan(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeGreaterThanOrEqual(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeGreaterThanOrEqual(0) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeLessThanOrEqual(1) + } } await dynamicThreadPool.destroy() }) @@ -1436,21 +1519,34 @@ describe('Abstract pool test suite', () => { const echoTaskFunction = data => { return data } - await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + await dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) expect(dynamicThreadPool.taskFunctions.size).toBe(1) expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ - taskFunction: echoTaskFunction + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' }, - { name: 'echo' } + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe( true ) expect(dynamicThreadPool.taskFunctions.size).toBe(0) expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined() + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ { name: DEFAULT_TASK_NAME }, { name: 'test' } @@ -1458,7 +1554,7 @@ describe('Abstract pool test suite', () => { await dynamicThreadPool.destroy() }) - it('Verify that listTaskFunctionNames() is working', async () => { + it('Verify that listTaskFunctionsProperties() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -1565,6 +1661,76 @@ describe('Abstract pool test suite', () => { { name: 'fibonacci' } ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + failed: 0, + queued: 0, + sequentiallyStolen: 0, + stolen: 0 + }, + runTime: { + history: expect.any(CircularBuffer) + }, + waitTime: { + history: expect.any(CircularBuffer) + }, + elu: { + idle: { + history: expect.any(CircularBuffer) + }, + active: { + history: expect.any(CircularBuffer) + } + } + }) + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed + ).toBeGreaterThan(0) + } + expect( + workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) + ).toStrictEqual( + workerNode.getTaskFunctionWorkerUsage( + workerNode.info.taskFunctionsProperties[1].name + ) + ) + } + await pool.destroy() + }) + + it('Verify that task function objects worker is working', async () => { + const pool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs' + ) + const data = { n: 10 } + const result0 = await pool.execute(data) + expect(result0).toStrictEqual({ ok: 1 }) + const result1 = await pool.execute(data, 'jsonIntegerSerialization') + expect(result1).toStrictEqual({ ok: 1 }) + const result2 = await pool.execute(data, 'factorial') + expect(result2).toBe(3628800) + const result3 = await pool.execute(data, 'fibonacci') + expect(result3).toBe(55) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(4) + for (const workerNode of pool.workerNodes) { + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } + ]) + expect(workerNode.taskFunctionsUsage.size).toBe(3) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { expect( workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) @@ -1578,17 +1744,17 @@ describe('Abstract pool test suite', () => { stolen: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } })