X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=e79795188b272fe1842f539673f3f23841dff90f;hb=e41b02718bb9355f7934f0198568cc0ac6f70f9a;hp=ccd2423102427a40878faf2ee347aa20f3f58964;hpb=39618ede0e08d380c1ac82005bcc2ab1f3c227b6;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index ccd24231..e7979518 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1,10 +1,15 @@ +// eslint-disable-next-line n/no-unsupported-features/node-builtins +import { createHook, executionAsyncId } from 'node:async_hooks' import { EventEmitterAsyncResource } from 'node:events' -import { dirname, join } from 'node:path' import { readFileSync } from 'node:fs' +import { dirname, join } from 'node:path' import { fileURLToPath } from 'node:url' -import { createHook, executionAsyncId } from 'node:async_hooks' + import { expect } from 'expect' import { restore, stub } from 'sinon' + +import { CircularArray } from '../../lib/circular-array.cjs' +import { Deque } from '../../lib/deque.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -15,11 +20,9 @@ import { WorkerChoiceStrategies, WorkerTypes } from '../../lib/index.cjs' -import { CircularArray } from '../../lib/circular-array.cjs' -import { Deque } from '../../lib/deque.cjs' +import { WorkerNode } from '../../lib/pools/worker-node.cjs' import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' import { waitPoolEvents } from '../test-utils.cjs' -import { WorkerNode } from '../../lib/pools/worker-node.cjs' describe('Abstract pool test suite', () => { const version = JSON.parse( @@ -221,6 +224,7 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) + expect(pool.emitter.eventNames()).toStrictEqual([]) expect(pool.opts).toStrictEqual({ startWorkers: true, enableEvents: true, @@ -228,7 +232,7 @@ describe('Abstract pool test suite', () => { enableTasksQueue: false, workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -284,7 +288,7 @@ describe('Abstract pool test suite', () => { errorHandler: testHandler, exitHandler: testHandler }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, @@ -443,7 +447,7 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -456,7 +460,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -482,7 +486,7 @@ describe('Abstract pool test suite', () => { runTime: { median: true }, elu: { median: true } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, @@ -495,7 +499,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -521,7 +525,7 @@ describe('Abstract pool test suite', () => { runTime: { median: false }, elu: { median: false } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -534,7 +538,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -702,7 +706,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: numberOfWorkers, maxSize: numberOfWorkers, workerNodes: numberOfWorkers, @@ -724,7 +729,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: Math.floor(numberOfWorkers / 2), maxSize: numberOfWorkers, workerNodes: Math.floor(numberOfWorkers / 2), @@ -861,8 +867,8 @@ describe('Abstract pool test suite', () => { ) expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) - expect(pool.readyEventEmitted).toBe(false) expect(pool.workerNodes).toStrictEqual([]) + expect(pool.readyEventEmitted).toBe(false) await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) @@ -969,7 +975,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => { + it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -1020,7 +1026,7 @@ describe('Abstract pool test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: 0, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, @@ -1043,6 +1049,10 @@ describe('Abstract pool test suite', () => { } } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) expect(workerNode.usage.runTime.history.length).toBe(0) expect(workerNode.usage.waitTime.history.length).toBe(0) expect(workerNode.usage.elu.idle.history.length).toBe(0) @@ -1073,7 +1083,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1113,7 +1124,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1152,7 +1164,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1194,7 +1207,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1239,7 +1253,7 @@ describe('Abstract pool test suite', () => { const elapsedTime = performance.now() - startTime expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier) expect(elapsedTime).toBeGreaterThanOrEqual(2000) - expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 400) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800) }) it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { @@ -1353,29 +1367,42 @@ describe('Abstract pool test suite', () => { new TypeError('name argument must not be an empty string') ) await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow( - new TypeError('fn argument must be a function') + new TypeError('taskFunction property must be a function') ) await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow( - new TypeError('fn argument must be a function') + new TypeError('taskFunction property must be a function') ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) const echoTaskFunction = data => { return data } await expect( - dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) ).resolves.toBe(true) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) const taskFunctionData = { test: 'test' } const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') @@ -1398,9 +1425,15 @@ describe('Abstract pool test suite', () => { }, elu: { idle: { + aggregate: 0, + maximum: 0, + minimum: 0, history: new CircularArray() }, active: { + aggregate: 0, + maximum: 0, + minimum: 0, history: new CircularArray() } } @@ -1416,9 +1449,9 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow( new Error('Cannot remove a task function not handled on the pool side') @@ -1426,40 +1459,53 @@ describe('Abstract pool test suite', () => { const echoTaskFunction = data => { return data } - await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + await dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe( true ) expect(dynamicThreadPool.taskFunctions.size).toBe(0) expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined() - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) await dynamicThreadPool.destroy() }) - it('Verify that listTaskFunctionNames() is working', async () => { + it('Verify that listTaskFunctionsProperties() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( @@ -1467,11 +1513,11 @@ describe('Abstract pool test suite', () => { './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) - expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await fixedClusterPool.destroy() }) @@ -1503,29 +1549,29 @@ describe('Abstract pool test suite', () => { `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'` ) ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await expect( dynamicThreadPool.setDefaultTaskFunction('factorial') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'factorial', - 'jsonIntegerSerialization', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'factorial' }, + { name: 'jsonIntegerSerialization' }, + { name: 'fibonacci' } ]) await expect( dynamicThreadPool.setDefaultTaskFunction('fibonacci') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'fibonacci', - 'jsonIntegerSerialization', - 'factorial' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'fibonacci' }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' } ]) await dynamicThreadPool.destroy() }) @@ -1548,15 +1594,17 @@ describe('Abstract pool test suite', () => { expect(pool.info.executingTasks).toBe(0) expect(pool.info.executedTasks).toBe(4) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) - for (const name of pool.listTaskFunctionNames()) { - expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1581,14 +1629,15 @@ describe('Abstract pool test suite', () => { } }) expect( - workerNode.getTaskFunctionWorkerUsage(name).tasks.executed + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed ).toBeGreaterThan(0) } expect( workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual( workerNode.getTaskFunctionWorkerUsage( - workerNode.info.taskFunctionNames[1] + workerNode.info.taskFunctionsProperties[1].name ) ) } @@ -1618,13 +1667,17 @@ describe('Abstract pool test suite', () => { await expect( pool.sendTaskFunctionOperationToWorker(workerNodeKey, { taskFunctionOperation: 'add', - taskFunctionName: 'empty', + taskFunctionProperties: { name: 'empty' }, taskFunction: (() => {}).toString() }) ).resolves.toBe(true) expect( - pool.workerNodes[workerNodeKey].info.taskFunctionNames - ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty']) + pool.workerNodes[workerNodeKey].info.taskFunctionsProperties + ).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' } + ]) await pool.destroy() }) @@ -1637,15 +1690,15 @@ describe('Abstract pool test suite', () => { await expect( pool.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', - taskFunctionName: 'empty', + taskFunctionProperties: { name: 'empty' }, taskFunction: (() => {}).toString() }) ).resolves.toBe(true) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'empty' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' } ]) } await pool.destroy()