X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fabstract-pool.test.mjs;h=e79795188b272fe1842f539673f3f23841dff90f;hb=f7a08a34fc67a4c4406798c85f1ad669a7d53223;hp=84212a3f4f81f953fe50a718cac4f5f0edba1cdf;hpb=b2fd3f4a217ac09110da00b3b65dcdd36de4bf74;p=poolifier.git diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 84212a3f..e7979518 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1,7 +1,15 @@ +// eslint-disable-next-line n/no-unsupported-features/node-builtins +import { createHook, executionAsyncId } from 'node:async_hooks' import { EventEmitterAsyncResource } from 'node:events' import { readFileSync } from 'node:fs' +import { dirname, join } from 'node:path' +import { fileURLToPath } from 'node:url' + import { expect } from 'expect' import { restore, stub } from 'sinon' + +import { CircularArray } from '../../lib/circular-array.cjs' +import { Deque } from '../../lib/deque.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -11,15 +19,18 @@ import { PoolTypes, WorkerChoiceStrategies, WorkerTypes -} from '../../lib/index.js' -import { CircularArray } from '../../lib/circular-array.js' -import { Deque } from '../../lib/deque.js' -import { DEFAULT_TASK_NAME } from '../../lib/utils.js' -import { waitPoolEvents } from '../test-utils.js' -import { WorkerNode } from '../../lib/pools/worker-node.js' +} from '../../lib/index.cjs' +import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' +import { waitPoolEvents } from '../test-utils.cjs' describe('Abstract pool test suite', () => { - const version = JSON.parse(readFileSync('./package.json', 'utf8')).version + const version = JSON.parse( + readFileSync( + join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'), + 'utf8' + ) + ).version const numberOfWorkers = 2 class StubPoolWithIsMain extends FixedThreadPool { isMain () { @@ -31,7 +42,16 @@ describe('Abstract pool test suite', () => { restore() }) - it('Simulate pool creation from a non main thread/process', () => { + it('Verify that pool can be created and destroyed', async () => { + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(pool).toBeInstanceOf(FixedThreadPool) + await pool.destroy() + }) + + it('Verify that pool cannot be created from a non main thread/process', () => { expect( () => new StubPoolWithIsMain( @@ -41,7 +61,7 @@ describe('Abstract pool test suite', () => { errorHandler: e => console.error(e) } ) - ).toThrowError( + ).toThrow( new Error( 'Cannot start a pool from a worker with the same type as the pool' ) @@ -53,30 +73,22 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.mjs' ) - expect(pool.starting).toBe(false) expect(pool.started).toBe(true) + expect(pool.starting).toBe(false) + expect(pool.destroying).toBe(false) await pool.destroy() }) it('Verify that filePath is checked', () => { - const expectedError = new Error( - 'Please specify a file with a worker implementation' + expect(() => new FixedThreadPool(numberOfWorkers)).toThrow( + new TypeError('The worker file path must be specified') ) - expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError( - expectedError - ) - expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError( - expectedError - ) - expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError( - expectedError - ) - expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError( - expectedError + expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow( + new TypeError('The worker file path must be a string') ) expect( () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts') - ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'")) + ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'")) }) it('Verify that numberOfWorkers is checked', () => { @@ -86,7 +98,7 @@ describe('Abstract pool test suite', () => { undefined, './tests/worker-files/thread/testWorker.mjs' ) - ).toThrowError( + ).toThrow( new Error( 'Cannot instantiate a pool without specifying the number of workers' ) @@ -96,8 +108,8 @@ describe('Abstract pool test suite', () => { it('Verify that a negative number of workers is checked', () => { expect( () => - new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js') - ).toThrowError( + new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs') + ).toThrow( new RangeError( 'Cannot instantiate a pool with a negative number of workers' ) @@ -108,22 +120,38 @@ describe('Abstract pool test suite', () => { expect( () => new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs') - ).toThrowError( + ).toThrow( new TypeError( 'Cannot instantiate a pool with a non safe integer number of workers' ) ) }) + it('Verify that pool arguments number and pool type are checked', () => { + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + undefined, + numberOfWorkers * 2 + ) + ).toThrow( + new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + ) + }) + it('Verify that dynamic pool sizing is checked', () => { expect( () => new DynamicClusterPool( 1, undefined, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) - ).toThrowError( + ).toThrow( new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' ) @@ -135,7 +163,7 @@ describe('Abstract pool test suite', () => { 1, './tests/worker-files/thread/testWorker.mjs' ) - ).toThrowError( + ).toThrow( new TypeError( 'Cannot instantiate a pool with a non safe integer number of workers' ) @@ -145,9 +173,9 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 0, 0.5, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) - ).toThrowError( + ).toThrow( new TypeError( 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' ) @@ -159,7 +187,7 @@ describe('Abstract pool test suite', () => { 1, './tests/worker-files/thread/testWorker.mjs' ) - ).toThrowError( + ).toThrow( new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) @@ -171,7 +199,7 @@ describe('Abstract pool test suite', () => { 0, './tests/worker-files/thread/testWorker.mjs' ) - ).toThrowError( + ).toThrow( new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) @@ -181,9 +209,9 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 1, 1, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) - ).toThrowError( + ).toThrow( new RangeError( 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead' ) @@ -196,32 +224,24 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) + expect(pool.emitter.eventNames()).toStrictEqual([]) expect(pool.opts).toStrictEqual({ startWorkers: true, enableEvents: true, restartWorkerOnError: true, enableTasksQueue: false, - workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN, - workerChoiceStrategyOptions: { - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } + workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } await pool.destroy() @@ -255,14 +275,12 @@ describe('Abstract pool test suite', () => { concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, workerChoiceStrategyOptions: { - retries: 6, runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, weights: { 0: 300, 1: 200 } }, onlineHandler: testHandler, @@ -270,17 +288,9 @@ describe('Abstract pool test suite', () => { errorHandler: testHandler, exitHandler: testHandler }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: false }, - weights: { 0: 300, 1: 200 } - }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: true }, waitTime: { median: false }, elu: { median: false }, @@ -290,7 +300,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that pool options are validated', async () => { + it('Verify that pool options are validated', () => { expect( () => new FixedThreadPool( @@ -300,41 +310,7 @@ describe('Abstract pool test suite', () => { workerChoiceStrategy: 'invalidStrategy' } ) - ).toThrowError( - new Error("Invalid worker choice strategy 'invalidStrategy'") - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: 'invalidChoiceRetries' - } - } - ) - ).toThrowError( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect( - () => - new FixedThreadPool( - numberOfWorkers, - './tests/worker-files/thread/testWorker.mjs', - { - workerChoiceStrategyOptions: { - retries: -1 - } - } - ) - ).toThrowError( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) + ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'")) expect( () => new FixedThreadPool( @@ -344,7 +320,7 @@ describe('Abstract pool test suite', () => { workerChoiceStrategyOptions: { weights: {} } } ) - ).toThrowError( + ).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' ) @@ -358,7 +334,7 @@ describe('Abstract pool test suite', () => { workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' } } ) - ).toThrowError( + ).toThrow( new Error( "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" ) @@ -373,7 +349,7 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: 'invalidTasksQueueOptions' } ) - ).toThrowError( + ).toThrow( new TypeError('Invalid tasks queue options: must be a plain object') ) expect( @@ -386,7 +362,7 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) @@ -401,7 +377,7 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: -1 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' ) @@ -416,7 +392,7 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { concurrency: 0.2 } } ) - ).toThrowError( + ).toThrow( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) expect( @@ -429,7 +405,7 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { size: 0 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks queue size: 0 is a negative integer or zero' ) @@ -444,7 +420,7 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { size: -1 } } ) - ).toThrowError( + ).toThrow( new RangeError( 'Invalid worker node tasks queue size: -1 is a negative integer or zero' ) @@ -459,7 +435,7 @@ describe('Abstract pool test suite', () => { tasksQueueOptions: { size: 0.2 } } ) - ).toThrowError( + ).toThrow( new TypeError('Invalid worker node tasks queue size: must be an integer') ) }) @@ -470,29 +446,21 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) - expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -515,28 +483,23 @@ describe('Abstract pool test suite', () => { elu: { median: true } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: true }, - waitTime: { median: false }, - elu: { median: true } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, runTime: { median: true }, - waitTime: { median: false }, elu: { median: true } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: true }, waitTime: { median: false }, - elu: { median: true } + elu: { median: true }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -559,28 +522,23 @@ describe('Abstract pool test suite', () => { elu: { median: false } }) expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({ - retries: 6, - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }) - expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, - waitTime: { median: false }, elu: { median: false } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ - retries: 6, runTime: { median: false }, waitTime: { median: false }, - elu: { median: false } + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -600,37 +558,19 @@ describe('Abstract pool test suite', () => { }) expect(() => pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions') - ).toThrowError( + ).toThrow( new TypeError( 'Invalid worker choice strategy options: must be a plain object' ) ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ - retries: 'invalidChoiceRetries' - }) - ).toThrowError( - new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ retries: -1 }) - ).toThrowError( - new RangeError( - "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero" - ) - ) - expect(() => - pool.setWorkerChoiceStrategyOptions({ weights: {} }) - ).toThrowError( + expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow( new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' ) ) expect(() => pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' }) - ).toThrowError( + ).toThrow( new Error( "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'" ) @@ -645,41 +585,27 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() - } pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) - } pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) - } pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() - for (const workerNode of pool.workerNodes) { - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() - } await pool.destroy() }) @@ -693,33 +619,32 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) } pool.setTasksQueueOptions({ concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 2, taskStealing: false, - tasksStealingOnBackPressure: false + tasksStealingOnBackPressure: false, + tasksFinishedTimeout: 3000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeUndefined() - expect(workerNode.onBackPressure).toBeUndefined() } pool.setTasksQueueOptions({ concurrency: 1, @@ -730,44 +655,41 @@ describe('Abstract pool test suite', () => { concurrency: 1, size: Math.pow(numberOfWorkers, 2), taskStealing: true, - tasksStealingOnBackPressure: true + tasksStealingOnBackPressure: true, + tasksFinishedTimeout: 2000 }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( pool.opts.tasksQueueOptions.size ) - expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) - expect(workerNode.onBackPressure).toBeInstanceOf(Function) } - expect(() => - pool.setTasksQueueOptions('invalidTasksQueueOptions') - ).toThrowError( + expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow( new TypeError('Invalid tasks queue options: must be a plain object') ) - expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow( new RangeError( 'Invalid worker node tasks concurrency: 0 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow( new RangeError( 'Invalid worker node tasks concurrency: -1 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow( new TypeError('Invalid worker node tasks concurrency: must be an integer') ) - expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow( new RangeError( 'Invalid worker node tasks queue size: 0 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow( new RangeError( 'Invalid worker node tasks queue size: -1 is a negative integer or zero' ) ) - expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrowError( + expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow( new TypeError('Invalid worker node tasks queue size: must be an integer') ) await pool.destroy() @@ -784,7 +706,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: numberOfWorkers, maxSize: numberOfWorkers, workerNodes: numberOfWorkers, @@ -798,7 +721,7 @@ describe('Abstract pool test suite', () => { pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) expect(pool.info).toStrictEqual({ version, @@ -806,7 +729,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: 0, minSize: Math.floor(numberOfWorkers / 2), maxSize: numberOfWorkers, workerNodes: Math.floor(numberOfWorkers / 2), @@ -822,7 +746,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks usage are initialized', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -832,6 +756,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -857,7 +782,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks queue are initialized', async () => { let pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -883,7 +808,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker info are initialized', async () => { let pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -891,7 +816,8 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.cluster, dynamic: false, - ready: true + ready: true, + stealing: false }) } await pool.destroy() @@ -906,16 +832,35 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true + ready: true, + stealing: false }) } await pool.destroy() }) + it('Verify that pool statuses are checked at start or destroy', async () => { + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(pool.info.started).toBe(true) + expect(pool.info.ready).toBe(true) + expect(() => pool.start()).toThrow( + new Error('Cannot start an already started pool') + ) + await pool.destroy() + expect(pool.info.started).toBe(false) + expect(pool.info.ready).toBe(false) + await expect(pool.destroy()).rejects.toThrow( + new Error('Cannot destroy an already destroyed pool') + ) + }) + it('Verify that pool can be started after initialization', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js', + './tests/worker-files/cluster/testWorker.cjs', { startWorkers: false } @@ -923,12 +868,15 @@ describe('Abstract pool test suite', () => { expect(pool.info.started).toBe(false) expect(pool.info.ready).toBe(false) expect(pool.workerNodes).toStrictEqual([]) - await expect(pool.execute()).rejects.toThrowError( + expect(pool.readyEventEmitted).toBe(false) + await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) pool.start() expect(pool.info.started).toBe(true) expect(pool.info.ready).toBe(true) + await waitPoolEvents(pool, PoolEvents.ready, 1) + expect(pool.readyEventEmitted).toBe(true) expect(pool.workerNodes.length).toBe(numberOfWorkers) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) @@ -939,22 +887,22 @@ describe('Abstract pool test suite', () => { it('Verify that pool execute() arguments are checked', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) - await expect(pool.execute(undefined, 0)).rejects.toThrowError( + await expect(pool.execute(undefined, 0)).rejects.toThrow( new TypeError('name argument must be a string') ) - await expect(pool.execute(undefined, '')).rejects.toThrowError( + await expect(pool.execute(undefined, '')).rejects.toThrow( new TypeError('name argument must not be an empty string') ) - await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError( + await expect(pool.execute(undefined, undefined, {})).rejects.toThrow( new TypeError('transferList argument must be an array') ) await expect(pool.execute(undefined, 'unknown')).rejects.toBe( "Task function 'unknown' not found" ) await pool.destroy() - await expect(pool.execute()).rejects.toThrowError( + await expect(pool.execute()).rejects.toThrow( new Error('Cannot execute a task on not started pool') ) }) @@ -962,7 +910,7 @@ describe('Abstract pool test suite', () => { it('Verify that pool worker tasks usage are computed', async () => { const pool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const promises = new Set() const maxMultiplier = 2 @@ -976,6 +924,7 @@ describe('Abstract pool test suite', () => { executing: maxMultiplier, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1003,6 +952,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1025,7 +975,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => { + it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -1044,6 +994,7 @@ describe('Abstract pool test suite', () => { executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1075,10 +1026,11 @@ describe('Abstract pool test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: 0, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1097,6 +1049,10 @@ describe('Abstract pool test suite', () => { } } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) expect(workerNode.usage.runTime.history.length).toBe(0) expect(workerNode.usage.waitTime.history.length).toBe(0) expect(workerNode.usage.elu.idle.history.length).toBe(0) @@ -1109,7 +1065,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) expect(pool.emitter.eventNames()).toStrictEqual([]) let poolInfo @@ -1127,7 +1083,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1167,7 +1124,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1206,7 +1164,8 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), @@ -1248,11 +1207,13 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, + strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), idleWorkerNodes: expect.any(Number), + stealingWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), @@ -1262,7 +1223,101 @@ describe('Abstract pool test suite', () => { stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) - expect(pool.hasBackPressure.called).toBe(true) + expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7) + await pool.destroy() + }) + + it('Verify that destroy() waits for queued tasks to finish', async () => { + const tasksFinishedTimeout = 2500 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier) + expect(elapsedTime).toBeGreaterThanOrEqual(2000) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800) + }) + + it('Verify that destroy() waits until the tasks finished timeout is reached', async () => { + const tasksFinishedTimeout = 1000 + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/asyncWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksFinishedTimeout } + } + ) + const maxMultiplier = 4 + let tasksFinished = 0 + for (const workerNode of pool.workerNodes) { + workerNode.on('taskFinished', () => { + ++tasksFinished + }) + } + for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) { + pool.execute() + } + expect(pool.info.queuedTasks).toBeGreaterThan(0) + const startTime = performance.now() + await pool.destroy() + const elapsedTime = performance.now() - startTime + expect(tasksFinished).toBe(0) + expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800) + }) + + it('Verify that pool asynchronous resource track tasks execution', async () => { + let taskAsyncId + let initCalls = 0 + let beforeCalls = 0 + let afterCalls = 0 + let resolveCalls = 0 + const hook = createHook({ + init (asyncId, type) { + if (type === 'poolifier:task') { + initCalls++ + taskAsyncId = asyncId + } + }, + before (asyncId) { + if (asyncId === taskAsyncId) beforeCalls++ + }, + after (asyncId) { + if (asyncId === taskAsyncId) afterCalls++ + }, + promiseResolve () { + if (executionAsyncId() === taskAsyncId) resolveCalls++ + } + }) + const pool = new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs' + ) + hook.enable() + await pool.execute() + hook.disable() + expect(initCalls).toBe(1) + expect(beforeCalls).toBe(1) + expect(afterCalls).toBe(1) + expect(resolveCalls).toBe(1) await pool.destroy() }) @@ -1283,7 +1338,7 @@ describe('Abstract pool test suite', () => { await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true) @@ -1305,36 +1360,49 @@ describe('Abstract pool test suite', () => { await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) await expect( dynamicThreadPool.addTaskFunction(0, () => {}) - ).rejects.toThrowError(new TypeError('name argument must be a string')) + ).rejects.toThrow(new TypeError('name argument must be a string')) await expect( dynamicThreadPool.addTaskFunction('', () => {}) - ).rejects.toThrowError( + ).rejects.toThrow( new TypeError('name argument must not be an empty string') ) - await expect( - dynamicThreadPool.addTaskFunction('test', 0) - ).rejects.toThrowError(new TypeError('fn argument must be a function')) - await expect( - dynamicThreadPool.addTaskFunction('test', '') - ).rejects.toThrowError(new TypeError('fn argument must be a function')) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow( + new TypeError('taskFunction property must be a function') + ) + await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow( + new TypeError('taskFunction property must be a function') + ) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) const echoTaskFunction = data => { return data } await expect( - dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) ).resolves.toBe(true) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) const taskFunctionData = { test: 'test' } const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') @@ -1345,6 +1413,7 @@ describe('Abstract pool test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0, failed: 0 }, @@ -1356,9 +1425,15 @@ describe('Abstract pool test suite', () => { }, elu: { idle: { + aggregate: 0, + maximum: 0, + minimum: 0, history: new CircularArray() }, active: { + aggregate: 0, + maximum: 0, + minimum: 0, history: new CircularArray() } } @@ -1374,64 +1449,75 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) - await expect( - dynamicThreadPool.removeTaskFunction('test') - ).rejects.toThrowError( + await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow( new Error('Cannot remove a task function not handled on the pool side') ) const echoTaskFunction = data => { return data } - await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction) + await dynamicThreadPool.addTaskFunction('echo', { + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) expect(dynamicThreadPool.taskFunctions.size).toBe(1) - expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual( - echoTaskFunction - ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'echo' + expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({ + taskFunction: echoTaskFunction, + strategy: WorkerChoiceStrategies.LEAST_ELU + }) + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([ + WorkerChoiceStrategies.ROUND_ROBIN, + WorkerChoiceStrategies.LEAST_ELU + ]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU } ]) await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe( true ) expect(dynamicThreadPool.taskFunctions.size).toBe(0) expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined() - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test' + expect([ + ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys() + ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN]) + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' } ]) await dynamicThreadPool.destroy() }) - it('Verify that listTaskFunctionNames() is working', async () => { + it('Verify that listTaskFunctionsProperties() is working', async () => { const dynamicThreadPool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await dynamicThreadPool.destroy() const fixedClusterPool = new FixedClusterPool( numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1) - expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await fixedClusterPool.destroy() }) @@ -1443,58 +1529,58 @@ describe('Abstract pool test suite', () => { './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs' ) await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1) - await expect( - dynamicThreadPool.setDefaultTaskFunction(0) - ).rejects.toThrowError( + const workerId = dynamicThreadPool.workerNodes[0].info.id + await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'" + `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'` ) ) await expect( dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME) - ).rejects.toThrowError( + ).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'" + `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'` ) ) await expect( dynamicThreadPool.setDefaultTaskFunction('unknown') - ).rejects.toThrowError( + ).rejects.toThrow( new Error( - "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'" + `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'` ) ) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) await expect( dynamicThreadPool.setDefaultTaskFunction('factorial') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'factorial', - 'jsonIntegerSerialization', - 'fibonacci' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'factorial' }, + { name: 'jsonIntegerSerialization' }, + { name: 'fibonacci' } ]) await expect( dynamicThreadPool.setDefaultTaskFunction('fibonacci') ).resolves.toBe(true) - expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([ - DEFAULT_TASK_NAME, - 'fibonacci', - 'jsonIntegerSerialization', - 'factorial' + expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'fibonacci' }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' } ]) + await dynamicThreadPool.destroy() }) it('Verify that multiple task functions worker is working', async () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js' + './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs' ) const data = { n: 10 } const result0 = await pool.execute(data) @@ -1508,20 +1594,23 @@ describe('Abstract pool test suite', () => { expect(pool.info.executingTasks).toBe(0) expect(pool.info.executedTasks).toBe(4) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'jsonIntegerSerialization', - 'factorial', - 'fibonacci' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'jsonIntegerSerialization' }, + { name: 'factorial' }, + { name: 'fibonacci' } ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) - for (const name of pool.listTaskFunctionNames()) { - expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({ + for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { + expect( + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + ).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, failed: 0, queued: 0, + sequentiallyStolen: 0, stolen: 0 }, runTime: { @@ -1540,14 +1629,15 @@ describe('Abstract pool test suite', () => { } }) expect( - workerNode.getTaskFunctionWorkerUsage(name).tasks.executed + workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) + .tasks.executed ).toBeGreaterThan(0) } expect( workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual( workerNode.getTaskFunctionWorkerUsage( - workerNode.info.taskFunctionNames[1] + workerNode.info.taskFunctionsProperties[1].name ) ) } @@ -1558,7 +1648,7 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const workerNodeKey = 0 await expect( @@ -1571,19 +1661,23 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) const workerNodeKey = 0 await expect( pool.sendTaskFunctionOperationToWorker(workerNodeKey, { taskFunctionOperation: 'add', - taskFunctionName: 'empty', + taskFunctionProperties: { name: 'empty' }, taskFunction: (() => {}).toString() }) ).resolves.toBe(true) expect( - pool.workerNodes[workerNodeKey].info.taskFunctionNames - ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty']) + pool.workerNodes[workerNodeKey].info.taskFunctionsProperties + ).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' } + ]) await pool.destroy() }) @@ -1591,20 +1685,20 @@ describe('Abstract pool test suite', () => { const pool = new DynamicClusterPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, - './tests/worker-files/cluster/testWorker.js' + './tests/worker-files/cluster/testWorker.cjs' ) await expect( pool.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', - taskFunctionName: 'empty', + taskFunctionProperties: { name: 'empty' }, taskFunction: (() => {}).toString() }) ).resolves.toBe(true) for (const workerNode of pool.workerNodes) { - expect(workerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'test', - 'empty' + expect(workerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'test' }, + { name: 'empty' } ]) } await pool.destroy()