X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=ec8fe516ffbd278d162306920d182be9e76c8427;hb=fcfc3353eb4053c02f64c80a14ae142d44388a71;hp=6a65f3803d0825b29052942873e13ff5edbe7460;hpb=59ca7cfff23a8ad84efaf61ab8c1015e67e97c24;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 6a65f380..ec8fe516 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -8,7 +8,7 @@ import { fileURLToPath } from 'node:url' import { expect } from 'expect' import { restore, stub } from 'sinon' -import { CircularArray } from '../../lib/circular-array.cjs' +import { CircularBuffer } from '../../lib/circular-buffer.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -20,7 +20,7 @@ import { WorkerTypes } from '../../lib/index.cjs' import { WorkerNode } from '../../lib/pools/worker-node.cjs' -import { PriorityQueue } from '../../lib/priority-queue.cjs' +import { defaultBucketSize, PriorityQueue } from '../../lib/priority-queue.cjs' import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' import { waitPoolEvents } from '../test-utils.cjs' @@ -275,7 +275,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, @@ -591,7 +591,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(true, { concurrency: 2 }) @@ -600,7 +600,7 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) pool.enableTasksQueue(false) @@ -619,7 +619,7 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true, + tasksStealingOnBackPressure: false, tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { @@ -761,17 +761,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) @@ -789,7 +789,8 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) - expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2) + expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize) + expect(workerNode.tasksQueue.enablePriority).toBe(false) } await pool.destroy() pool = new DynamicThreadPool( @@ -802,7 +803,8 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) - expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2) + expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize) + expect(workerNode.tasksQueue.enablePriority).toBe(false) } await pool.destroy() }) @@ -819,7 +821,8 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.cluster, dynamic: false, ready: true, - stealing: false + stealing: false, + backPressure: false }) } await pool.destroy() @@ -835,7 +838,8 @@ describe('Abstract pool test suite', () => { type: WorkerTypes.thread, dynamic: false, ready: true, - stealing: false + stealing: false, + backPressure: false }) } await pool.destroy() @@ -931,17 +935,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -959,17 +963,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -1001,17 +1005,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -1019,10 +1023,6 @@ describe('Abstract pool test suite', () => { expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( numberOfWorkers * maxMultiplier ) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) for (const workerNode of pool.workerNodes) { @@ -1037,17 +1037,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } }) @@ -1055,10 +1055,6 @@ describe('Abstract pool test suite', () => { expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( numberOfWorkers * maxMultiplier ) - expect(workerNode.usage.runTime.history.length).toBe(0) - expect(workerNode.usage.waitTime.history.length).toBe(0) - expect(workerNode.usage.elu.idle.history.length).toBe(0) - expect(workerNode.usage.elu.active.history.length).toBe(0) } await pool.destroy() }) @@ -1450,17 +1446,17 @@ describe('Abstract pool test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: expect.objectContaining({ idle: expect.objectContaining({ - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }), active: expect.objectContaining({ - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }) }) }) @@ -1667,6 +1663,78 @@ describe('Abstract pool test suite', () => { { name: 'fibonacci' } ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.tasksQueue.enablePriority).toBe(false) + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + failed: 0, + queued: 0, + sequentiallyStolen: 0, + stolen: 0 + }, + runTime: { + history: expect.any(CircularBuffer) + }, + waitTime: { + history: expect.any(CircularBuffer) + }, + elu: { + idle: { + history: expect.any(CircularBuffer) + }, + active: { + history: expect.any(CircularBuffer) + } + } + }) + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed + ).toBeGreaterThan(0) + } + expect( + workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) + ).toStrictEqual( + workerNode.getTaskFunctionWorkerUsage( + workerNode.info.taskFunctionsProperties[1].name + ) + ) + } + await pool.destroy() + }) + + it('Verify that task function objects worker is working', async () => { + const pool = new DynamicThreadPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs' + ) + const data = { n: 10 } + const result0 = await pool.execute(data) + expect(result0).toStrictEqual({ ok: 1 }) + const result1 = await pool.execute(data, 'jsonIntegerSerialization') + expect(result1).toStrictEqual({ ok: 1 }) + const result2 = await pool.execute(data, 'factorial') + expect(result2).toBe(3628800) + const result3 = await pool.execute(data, 'fibonacci') + expect(result3).toBe(55) + expect(pool.info.executingTasks).toBe(0) + expect(pool.info.executedTasks).toBe(4) + for (const workerNode of pool.workerNodes) { + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci', priority: -5 } + ]) + expect(workerNode.taskFunctionsUsage.size).toBe(3) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.tasksQueue.enablePriority).toBe(true) for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { expect( workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) @@ -1680,17 +1748,17 @@ describe('Abstract pool test suite', () => { stolen: 0 }, runTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, waitTime: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, elu: { idle: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) }, active: { - history: expect.any(CircularArray) + history: expect.any(CircularBuffer) } } })